You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2016/12/21 18:04:20 UTC
[2/2] hadoop git commit: YARN-5976. Update hbase version to 1.2.
Contributed by Vrushali C.
YARN-5976. Update hbase version to 1.2. Contributed by Vrushali C.
(cherry picked from commit f945008d1cf5730bdebeae501ed0e42477ad219e)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9bcfbf5e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9bcfbf5e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9bcfbf5e
Branch: refs/heads/YARN-5355-branch-2
Commit: 9bcfbf5ec172715c652d7dfe4900fd73631139bb
Parents: cf8e3a8
Author: Sangjin Lee <sj...@apache.org>
Authored: Wed Dec 21 09:53:07 2016 -0800
Committer: Sangjin Lee <sj...@apache.org>
Committed: Wed Dec 21 10:04:03 2016 -0800
----------------------------------------------------------------------
LICENSE.txt | 8 +-
hadoop-project/pom.xml | 26 +-
.../pom.xml | 142 +-------
...TestPhoenixOfflineAggregationWriterImpl.java | 161 ---------
.../hadoop-yarn-server-timelineservice/pom.xml | 26 +-
.../PhoenixOfflineAggregationWriterImpl.java | 358 -------------------
.../storage/TimelineSchemaCreator.java | 22 --
7 files changed, 18 insertions(+), 725 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bcfbf5e/LICENSE.txt
----------------------------------------------------------------------
diff --git a/LICENSE.txt b/LICENSE.txt
index ee5d528..fee4ae4 100644
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -1542,12 +1542,6 @@ JLine 0.9.94
leveldbjni-all 1.8
Hamcrest Core 1.3
xmlenc Library 0.52
-StringTemplate 4 4.0.7
-ANTLR 3 Tool 3.5
-ANTLR 3 Runtime 3.5
-ANTLR StringTemplate 3.2.1
-ASM All 5.0.2
-sqlline 1.1.8
--------------------------------------------------------------------------------
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
@@ -1778,7 +1772,7 @@ the Licensor and You.
The binary distribution of this product bundles these dependencies under the
following license:
-jamon-runtime 2.3.1
+jamon-runtime 2.4.1
--------------------------------------------------------------------------------
MOZILLA PUBLIC LICENSE
Version 1.1
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bcfbf5e/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 771d3e4..6c6de28 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -50,8 +50,7 @@
<kafka.version>0.8.2.1</kafka.version>
- <hbase.version>1.1.3</hbase.version>
- <phoenix.version>4.7.0-HBase-1.1</phoenix.version>
+ <hbase.version>1.2.4</hbase.version>
<hbase-compatible-hadoop.version>2.5.1</hbase-compatible-hadoop.version>
<hadoop.assemblies.version>${project.version}</hadoop.assemblies.version>
@@ -1095,29 +1094,6 @@
<classifier>tests</classifier>
</dependency>
<dependency>
- <groupId>org.apache.phoenix</groupId>
- <artifactId>phoenix-core</artifactId>
- <version>${phoenix.version}</version>
- <exclusions>
- <!-- Exclude jline from here -->
- <exclusion>
- <artifactId>jline</artifactId>
- <groupId>jline</groupId>
- </exclusion>
- <exclusion>
- <artifactId>joda-time</artifactId>
- <groupId>joda-time</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.phoenix</groupId>
- <artifactId>phoenix-core</artifactId>
- <type>test-jar</type>
- <version>${phoenix.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-it</artifactId>
<version>${hbase.version}</version>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bcfbf5e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml
index c627112..ed014de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml
@@ -56,10 +56,6 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
- <exclusion>
- <groupId>org.apache.phoenix</groupId>
- <artifactId>phoenix-core</artifactId>
- </exclusion>
</exclusions>
</dependency>
@@ -76,6 +72,8 @@
</exclusions>
</dependency>
+ <!-- 'mvn dependency:analyze' fails to detect use of this direct
+ dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
@@ -111,20 +109,6 @@
</exclusions>
</dependency>
- <!-- 'mvn dependency:analyze' fails to detect use of this direct
- dependency -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-common</artifactId>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
@@ -145,14 +129,14 @@
<dependency>
<groupId>com.sun.jersey</groupId>
- <artifactId>jersey-core</artifactId>
+ <artifactId>jersey-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-client</artifactId>
- <scope>test</scope>
+ <groupId>javax.ws.rs</groupId>
+ <artifactId>jsr311-api</artifactId>
+ <version>1.1.1</version>
</dependency>
<dependency>
@@ -225,23 +209,6 @@
<dependency>
<groupId>org.apache.hbase</groupId>
- <artifactId>hbase-common</artifactId>
- <classifier>tests</classifier>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
@@ -276,99 +243,6 @@
<!-- 'mvn dependency:analyze' fails to detect use of this direct
dependency -->
<dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-it</artifactId>
- <scope>test</scope>
- <classifier>tests</classifier>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-auth</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.phoenix</groupId>
- <artifactId>phoenix-core</artifactId>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-auth</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>net.sourceforge.findbugs</groupId>
- <artifactId>annotations</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.phoenix</groupId>
- <artifactId>phoenix-core</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-auth</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>net.sourceforge.findbugs</groupId>
- <artifactId>annotations</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <!-- for runtime dependencies -->
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hbase-compatible-hadoop.version}</version>
@@ -382,6 +256,8 @@
</exclusions>
</dependency>
+ <!-- 'mvn dependency:analyze' fails to detect use of this direct
+ dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
@@ -389,6 +265,8 @@
<scope>test</scope>
</dependency>
+ <!-- 'mvn dependency:analyze' fails to detect use of this direct
+ dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bcfbf5e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java
deleted file mode 100644
index e34ae90..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.hadoop.hbase.IntegrationTestingUtility;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
-import org.apache.phoenix.query.BaseTest;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.ReadOnlyProps;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-
-public class TestPhoenixOfflineAggregationWriterImpl extends BaseTest {
- private static PhoenixOfflineAggregationWriterImpl storage;
- private static final int BATCH_SIZE = 3;
-
- @BeforeClass
- public static void setup() throws Exception {
- YarnConfiguration conf = new YarnConfiguration();
- storage = setupPhoenixClusterAndWriterForTest(conf);
- }
-
- @Test(timeout = 90000)
- public void testFlowLevelAggregationStorage() throws Exception {
- testAggregator(OfflineAggregationInfo.FLOW_AGGREGATION);
- }
-
- @Test(timeout = 90000)
- public void testUserLevelAggregationStorage() throws Exception {
- testAggregator(OfflineAggregationInfo.USER_AGGREGATION);
- }
-
- @AfterClass
- public static void cleanup() throws Exception {
- storage.dropTable(OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME);
- storage.dropTable(OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME);
- tearDownMiniCluster();
- }
-
- private static PhoenixOfflineAggregationWriterImpl
- setupPhoenixClusterAndWriterForTest(YarnConfiguration conf)
- throws Exception {
- Map<String, String> props = new HashMap<>();
- // Must update config before starting server
- props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
- Boolean.FALSE.toString());
- props.put("java.security.krb5.realm", "");
- props.put("java.security.krb5.kdc", "");
- props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER,
- Boolean.FALSE.toString());
- props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000));
- props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100));
- // Make a small batch size to test multiple calls to reserve sequences
- props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB,
- Long.toString(BATCH_SIZE));
- // Must update config before starting server
- setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-
- // Change connection settings for test
- conf.set(
- YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR,
- getUrl());
- PhoenixOfflineAggregationWriterImpl
- myWriter = new PhoenixOfflineAggregationWriterImpl(TEST_PROPERTIES);
- myWriter.init(conf);
- myWriter.start();
- myWriter.createPhoenixTables();
- return myWriter;
- }
-
- private static TimelineEntity getTestAggregationTimelineEntity() {
- TimelineEntity entity = new TimelineEntity();
- String id = "hello1";
- String type = "testAggregationType";
- entity.setId(id);
- entity.setType(type);
- entity.setCreatedTime(1425016501000L);
-
- TimelineMetric metric = new TimelineMetric();
- metric.setId("HDFS_BYTES_READ");
- metric.addValue(1425016501100L, 8000);
- entity.addMetric(metric);
-
- return entity;
- }
-
- private void testAggregator(OfflineAggregationInfo aggregationInfo)
- throws Exception {
- // Set up a list of timeline entities and write them back to Phoenix
- int numEntity = 1;
- TimelineEntities te = new TimelineEntities();
- te.addEntity(getTestAggregationTimelineEntity());
- TimelineCollectorContext context = new TimelineCollectorContext("cluster_1",
- "user1", "testFlow", null, 0L, null);
- storage.writeAggregatedEntity(context, te,
- aggregationInfo);
-
- // Verify if we're storing all entities
- String[] primaryKeyList = aggregationInfo.getPrimaryKeyList();
- String sql = "SELECT COUNT(" + primaryKeyList[primaryKeyList.length - 1]
- +") FROM " + aggregationInfo.getTableName();
- verifySQLWithCount(sql, numEntity, "Number of entities should be ");
- // Check metric
- sql = "SELECT COUNT(m.HDFS_BYTES_READ) FROM "
- + aggregationInfo.getTableName() + "(m.HDFS_BYTES_READ VARBINARY) ";
- verifySQLWithCount(sql, numEntity,
- "Number of entities with info should be ");
- }
-
-
- private void verifySQLWithCount(String sql, int targetCount, String message)
- throws Exception {
- try (
- Statement stmt =
- storage.getConnection().createStatement();
- ResultSet rs = stmt.executeQuery(sql)) {
- assertTrue("Result set empty on statement " + sql, rs.next());
- assertNotNull("Fail to execute query " + sql, rs);
- assertEquals(message + " " + targetCount, targetCount, rs.getInt(1));
- } catch (SQLException se) {
- fail("SQL exception on query: " + sql
- + " With exception message: " + se.getLocalizedMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bcfbf5e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
index 1eca09f..7a5e38a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
@@ -93,11 +93,6 @@
<dependency>
<groupId>com.sun.jersey</groupId>
- <artifactId>jersey-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
</dependency>
@@ -132,6 +127,12 @@
</dependency>
<dependency>
+ <groupId>javax.ws.rs</groupId>
+ <artifactId>jsr311-api</artifactId>
+ <version>1.1.1</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<exclusions>
@@ -176,21 +177,6 @@
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.phoenix</groupId>
- <artifactId>phoenix-core</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>net.sourceforge.findbugs</groupId>
- <artifactId>annotations</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bcfbf5e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java
deleted file mode 100644
index 130cb6c..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
-import org.apache.phoenix.util.PropertiesUtil;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-/**
- * Offline aggregation Phoenix storage. This storage currently consists of two
- * aggregation tables, one for flow level aggregation and one for user level
- * aggregation.
- *
- * Example table record:
- *
- * <pre>
- * |---------------------------|
- * | Primary | Column Family|
- * | key | metrics |
- * |---------------------------|
- * | row_key | metricId1: |
- * | | metricValue1 |
- * | | @timestamp1 |
- * | | |
- * | | metriciD1: |
- * | | metricValue2 |
- * | | @timestamp2 |
- * | | |
- * | | metricId2: |
- * | | metricValue1 |
- * | | @timestamp2 |
- * | | |
- * | | |
- * | | |
- * | | |
- * | | |
- * | | |
- * | | |
- * | | |
- * | | |
- * | | |
- * | | |
- * | | |
- * | | |
- * | | |
- * |---------------------------|
- * </pre>
- *
- * For the flow aggregation table, the primary key contains user, cluster, and
- * flow id. For user aggregation table,the primary key is user.
- *
- * Metrics column family stores all aggregated metrics for each record.
- */
-@Private
-@Unstable
-public class PhoenixOfflineAggregationWriterImpl
- extends OfflineAggregationWriter {
-
- private static final Log LOG
- = LogFactory.getLog(PhoenixOfflineAggregationWriterImpl.class);
- private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER
- = "timeline_cf_placeholder";
-
- /** Default Phoenix JDBC driver name. */
- private static final String DRIVER_CLASS_NAME
- = "org.apache.phoenix.jdbc.PhoenixDriver";
-
- /** Default Phoenix timeline config column family. */
- private static final String METRIC_COLUMN_FAMILY = "m.";
- /** Default Phoenix timeline info column family. */
- private static final String INFO_COLUMN_FAMILY = "i.";
- /** Default separator for Phoenix storage. */
- private static final String AGGREGATION_STORAGE_SEPARATOR = ";";
-
- /** Connection string to the deployed Phoenix cluster. */
- private String connString = null;
- private Properties connProperties = new Properties();
-
- public PhoenixOfflineAggregationWriterImpl(Properties prop) {
- super(PhoenixOfflineAggregationWriterImpl.class.getName());
- connProperties = PropertiesUtil.deepCopy(prop);
- }
-
- public PhoenixOfflineAggregationWriterImpl() {
- super(PhoenixOfflineAggregationWriterImpl.class.getName());
- }
-
- @Override
- public void serviceInit(Configuration conf) throws Exception {
- Class.forName(DRIVER_CLASS_NAME);
- // so check it here and only read in the config if it's not overridden.
- connString =
- conf.get(YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR,
- YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR_DEFAULT);
- super.init(conf);
- }
-
- @Override
- public TimelineWriteResponse writeAggregatedEntity(
- TimelineCollectorContext context, TimelineEntities entities,
- OfflineAggregationInfo info) throws IOException {
- TimelineWriteResponse response = new TimelineWriteResponse();
- String sql = "UPSERT INTO " + info.getTableName()
- + " (" + StringUtils.join(info.getPrimaryKeyList(), ",")
- + ", created_time, metric_names) "
- + "VALUES ("
- + StringUtils.repeat("?,", info.getPrimaryKeyList().length)
- + "?, ?)";
- if (LOG.isDebugEnabled()) {
- LOG.debug("TimelineEntity write SQL: " + sql);
- }
-
- try (Connection conn = getConnection();
- PreparedStatement ps = conn.prepareStatement(sql)) {
- for (TimelineEntity entity : entities.getEntities()) {
- HashMap<String, TimelineMetric> formattedMetrics = new HashMap<>();
- if (entity.getMetrics() != null) {
- for (TimelineMetric m : entity.getMetrics()) {
- formattedMetrics.put(m.getId(), m);
- }
- }
- int idx = info.setStringsForPrimaryKey(ps, context, null, 1);
- ps.setLong(idx++, entity.getCreatedTime());
- ps.setString(idx++,
- StringUtils.join(formattedMetrics.keySet().toArray(),
- AGGREGATION_STORAGE_SEPARATOR));
- ps.execute();
-
- storeEntityVariableLengthFields(entity, formattedMetrics, context, conn,
- info);
-
- conn.commit();
- }
- } catch (SQLException se) {
- LOG.error("Failed to add entity to Phoenix " + se.getMessage());
- throw new IOException(se);
- } catch (Exception e) {
- LOG.error("Exception on getting connection: " + e.getMessage());
- throw new IOException(e);
- }
- return response;
- }
-
- /**
- * Create Phoenix tables for offline aggregation storage if the tables do not
- * exist.
- *
- * @throws IOException if any problem happens while creating Phoenix tables.
- */
- public void createPhoenixTables() throws IOException {
- // Create tables if necessary
- try (Connection conn = getConnection();
- Statement stmt = conn.createStatement()) {
- // Table schema defined as in YARN-3817.
- String sql = "CREATE TABLE IF NOT EXISTS "
- + OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME
- + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
- + "flow_name VARCHAR NOT NULL, "
- + "created_time UNSIGNED_LONG, "
- + METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER
- + " VARBINARY, "
- + "metric_names VARCHAR, info_keys VARCHAR "
- + "CONSTRAINT pk PRIMARY KEY("
- + "user, cluster, flow_name))";
- stmt.executeUpdate(sql);
- sql = "CREATE TABLE IF NOT EXISTS "
- + OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME
- + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
- + "created_time UNSIGNED_LONG, "
- + METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER
- + " VARBINARY, "
- + "metric_names VARCHAR, info_keys VARCHAR "
- + "CONSTRAINT pk PRIMARY KEY(user, cluster))";
- stmt.executeUpdate(sql);
- conn.commit();
- } catch (SQLException se) {
- LOG.error("Failed in init data " + se.getLocalizedMessage());
- throw new IOException(se);
- }
- return;
- }
-
- // Utility functions
- @Private
- @VisibleForTesting
- Connection getConnection() throws IOException {
- Connection conn;
- try {
- conn = DriverManager.getConnection(connString, connProperties);
- conn.setAutoCommit(false);
- } catch (SQLException se) {
- LOG.error("Failed to connect to phoenix server! "
- + se.getLocalizedMessage());
- throw new IOException(se);
- }
- return conn;
- }
-
- // WARNING: This method will permanently drop a table!
- @Private
- @VisibleForTesting
- void dropTable(String tableName) throws Exception {
- try (Connection conn = getConnection();
- Statement stmt = conn.createStatement()) {
- String sql = "DROP TABLE " + tableName;
- stmt.executeUpdate(sql);
- } catch (SQLException se) {
- LOG.error("Failed in dropping entity table " + se.getLocalizedMessage());
- throw se;
- }
- }
-
- private static class DynamicColumns<K> {
- static final String COLUMN_FAMILY_TYPE_BYTES = " VARBINARY";
- static final String COLUMN_FAMILY_TYPE_STRING = " VARCHAR";
- private String columnFamilyPrefix;
- private String type;
- private Set<K> columns;
-
- public DynamicColumns(String columnFamilyPrefix, String type,
- Set<K> keyValues) {
- this.columnFamilyPrefix = columnFamilyPrefix;
- this.columns = keyValues;
- this.type = type;
- }
- }
-
- private static <K> StringBuilder appendColumnsSQL(
- StringBuilder colNames, DynamicColumns<K> cfInfo) {
- // Prepare the sql template by iterating through all keys
- for (K key : cfInfo.columns) {
- colNames.append(",").append(cfInfo.columnFamilyPrefix)
- .append(key.toString()).append(cfInfo.type);
- }
- return colNames;
- }
-
- private static <K, V> int setValuesForColumnFamily(
- PreparedStatement ps, Map<K, V> keyValues, int startPos,
- boolean converToBytes) throws SQLException {
- int idx = startPos;
- for (Map.Entry<K, V> entry : keyValues.entrySet()) {
- V value = entry.getValue();
- if (value instanceof Collection) {
- ps.setString(idx++, StringUtils.join(
- (Collection) value, AGGREGATION_STORAGE_SEPARATOR));
- } else {
- if (converToBytes) {
- try {
- ps.setBytes(idx++, GenericObjectMapper.write(entry.getValue()));
- } catch (IOException ie) {
- LOG.error("Exception in converting values into bytes "
- + ie.getMessage());
- throw new SQLException(ie);
- }
- } else {
- ps.setString(idx++, value.toString());
- }
- }
- }
- return idx;
- }
-
- private static <K, V> int setBytesForColumnFamily(
- PreparedStatement ps, Map<K, V> keyValues, int startPos)
- throws SQLException {
- return setValuesForColumnFamily(ps, keyValues, startPos, true);
- }
-
- private static <K, V> int setStringsForColumnFamily(
- PreparedStatement ps, Map<K, V> keyValues, int startPos)
- throws SQLException {
- return setValuesForColumnFamily(ps, keyValues, startPos, false);
- }
-
- private static void storeEntityVariableLengthFields(TimelineEntity entity,
- Map<String, TimelineMetric> formattedMetrics,
- TimelineCollectorContext context, Connection conn,
- OfflineAggregationInfo aggregationInfo) throws SQLException {
- int numPlaceholders = 0;
- StringBuilder columnDefs = new StringBuilder(
- StringUtils.join(aggregationInfo.getPrimaryKeyList(), ","));
- if (formattedMetrics != null && formattedMetrics.size() > 0) {
- appendColumnsSQL(columnDefs, new DynamicColumns<>(
- METRIC_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES,
- formattedMetrics.keySet()));
- numPlaceholders += formattedMetrics.keySet().size();
- }
- if (numPlaceholders == 0) {
- return;
- }
- StringBuilder placeholders = new StringBuilder();
- placeholders.append(
- StringUtils.repeat("?,", aggregationInfo.getPrimaryKeyList().length));
- // numPlaceholders >= 1 now
- placeholders.append("?")
- .append(StringUtils.repeat(",?", numPlaceholders - 1));
- String sqlVariableLengthFields = new StringBuilder("UPSERT INTO ")
- .append(aggregationInfo.getTableName()).append(" (").append(columnDefs)
- .append(") VALUES(").append(placeholders).append(")").toString();
- if (LOG.isDebugEnabled()) {
- LOG.debug("SQL statement for variable length fields: "
- + sqlVariableLengthFields);
- }
- // Use try with resource statement for the prepared statement
- try (PreparedStatement psVariableLengthFields =
- conn.prepareStatement(sqlVariableLengthFields)) {
- int idx = aggregationInfo.setStringsForPrimaryKey(
- psVariableLengthFields, context, null, 1);
- if (formattedMetrics != null && formattedMetrics.size() > 0) {
- idx = setBytesForColumnFamily(
- psVariableLengthFields, formattedMetrics, idx);
- }
- psVariableLengthFields.execute();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bcfbf5e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
index fa0d479..dd87169 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
@@ -59,7 +59,6 @@ public final class TimelineSchemaCreator {
final static String NAME = TimelineSchemaCreator.class.getSimpleName();
private static final Log LOG = LogFactory.getLog(TimelineSchemaCreator.class);
- private static final String PHOENIX_OPTION_SHORT = "p";
private static final String SKIP_EXISTING_TABLE_OPTION_SHORT = "s";
private static final String APP_TABLE_NAME_SHORT = "a";
private static final String APP_TO_FLOW_TABLE_NAME_SHORT = "a2f";
@@ -117,22 +116,6 @@ public final class TimelineSchemaCreator {
exceptions.add(e);
}
- // Create Phoenix data schema if needed
- if (commandLine.hasOption(PHOENIX_OPTION_SHORT)) {
- Configuration phoenixConf = new Configuration();
- try {
- PhoenixOfflineAggregationWriterImpl phoenixWriter =
- new PhoenixOfflineAggregationWriterImpl();
- phoenixWriter.init(phoenixConf);
- phoenixWriter.start();
- phoenixWriter.createPhoenixTables();
- phoenixWriter.stop();
- LOG.info("Successfully created Phoenix offline aggregation schema. ");
- } catch (IOException e) {
- LOG.error("Error in creating phoenix tables: " + e.getMessage());
- exceptions.add(e);
- }
- }
if (exceptions.size() > 0) {
LOG.warn("Schema creation finished with the following exceptions");
for (Exception e : exceptions) {
@@ -182,11 +165,6 @@ public final class TimelineSchemaCreator {
// Options without an argument
// No need to set arg name since we do not need an argument here
- o = new Option(PHOENIX_OPTION_SHORT, "usePhoenix", false,
- "create Phoenix offline aggregation tables");
- o.setRequired(false);
- options.addOption(o);
-
o = new Option(SKIP_EXISTING_TABLE_OPTION_SHORT, "skipExistingTable",
false, "skip existing Hbase tables and continue to create new tables");
o.setRequired(false);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org