You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2016/01/20 10:13:22 UTC
[05/50] [abbrv] hadoop git commit: YARN-3904. Refactor
timelineservice.storage to add support to online and offline aggregation
writers (Li Lu via sjlee)
YARN-3904. Refactor timelineservice.storage to add support to online and offline aggregation writers (Li Lu via sjlee)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a87a00ee
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a87a00ee
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a87a00ee
Branch: refs/heads/feature-YARN-2928
Commit: a87a00ee68ee929b8a297c97dad0999e800a9f59
Parents: 557fd5e
Author: Sangjin Lee <sj...@apache.org>
Authored: Mon Aug 17 16:48:58 2015 -0700
Committer: Li Lu <gt...@apache.org>
Committed: Tue Jan 19 17:37:47 2016 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../dev-support/findbugs-exclude.xml | 7 +-
.../hadoop/yarn/conf/YarnConfiguration.java | 10 +
.../storage/OfflineAggregationWriter.java | 66 +++
.../PhoenixOfflineAggregationWriterImpl.java | 356 +++++++++++++
.../storage/PhoenixTimelineWriterImpl.java | 530 -------------------
.../storage/TimelineSchemaCreator.java | 45 +-
.../storage/common/OfflineAggregationInfo.java | 110 ++++
...TestPhoenixOfflineAggregationWriterImpl.java | 162 ++++++
.../storage/TestPhoenixTimelineWriterImpl.java | 152 ------
.../storage/TestTimelineWriterImpl.java | 74 ---
11 files changed, 754 insertions(+), 761 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87a00ee/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index c3a8172..42da97b 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -88,6 +88,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-3906. Split the application table from the entity table. (Sangjin Lee
via junping_du)
+ YARN-3904. Refactor timelineservice.storage to add support to online and
+ offline aggregation writers (Li Lu via sjlee)
+
IMPROVEMENTS
YARN-3276. Code cleanup for timeline service API records. (Junping Du via
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87a00ee/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 0dcdd15..d36f245 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -499,13 +499,12 @@
<!-- Ignore SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING warnings for Timeline Phoenix storage. -->
<!-- Since we're using dynamic columns, we have to generate SQL statements dynamically -->
<Match>
- <Class name="org.apache.hadoop.yarn.server.timelineservice.storage.PhoenixTimelineWriterImpl" />
+ <Class name="org.apache.hadoop.yarn.server.timelineservice.storage.PhoenixOfflineAggregationWriterImpl" />
<Or>
<Method name="storeEntityVariableLengthFields" />
- <Method name="storeEvents" />
- <Method name="storeMetrics" />
- <Method name="write" />
+ <Method name="writeAggregatedEntity" />
</Or>
+ <Bug pattern="SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING" />
</Match>
<!-- Following fields are used in ErrorsAndWarningsBlock, which is not a part of analysis of findbugs -->
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87a00ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index b8281b0..2946240 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2005,6 +2005,16 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_TIMELINE_DELEGATION_TOKEN_MAX_LIFETIME =
7*24*60*60*1000; // 7 days
+ // Timeline service v2 offlien aggregation related keys
+ public static final String TIMELINE_OFFLINE_AGGREGATION_PREFIX =
+ YarnConfiguration.TIMELINE_SERVICE_PREFIX + "aggregation.offline.";
+ public static final String PHOENIX_OFFLINE_STORAGE_CONN_STR
+ = TIMELINE_OFFLINE_AGGREGATION_PREFIX
+ + "phoenix.connectionString";
+
+ public static final String PHOENIX_OFFLINE_STORAGE_CONN_STR_DEFAULT
+ = "jdbc:phoenix:localhost:2181:/hbase";
+
// ///////////////////////////////
// Shared Cache Configs
// ///////////////////////////////
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87a00ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java
new file mode 100644
index 0000000..e1219e0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo;
+
+import java.io.IOException;
+
+/**
+ * YARN timeline service v2 offline aggregation storage interface
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class OfflineAggregationWriter extends AbstractService {
+
+ /**
+ * Construct the offline writer.
+ *
+ * @param name service name
+ */
+ public OfflineAggregationWriter(String name) {
+ super(name);
+ }
+
+ /**
+ * Persist aggregated timeline entities to the offline store based on which
+ * track this entity is to be rolled up to. The tracks along which aggregations
+ * are to be done are given by {@link OfflineAggregationInfo}.
+ *
+ * @param context a {@link TimelineCollectorContext} object that describes the
+ * context information of the aggregated data. Depends on the
+ * type of the aggregation, some fields of this context maybe
+ * empty or null.
+ * @param entities {@link TimelineEntities} to be persisted.
+ * @param info an {@link OfflineAggregationInfo} object that describes the
+ * detail of the aggregation. Current supported option is
+ * {@link OfflineAggregationInfo#FLOW_AGGREGATION}.
+ * @return a {@link TimelineWriteResponse} object.
+ * @throws IOException
+ */
+ abstract TimelineWriteResponse writeAggregatedEntity(
+ TimelineCollectorContext context,
+ TimelineEntities entities, OfflineAggregationInfo info) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87a00ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java
new file mode 100644
index 0000000..4c1352c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.phoenix.util.PropertiesUtil;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Offline aggregation Phoenix storage. This storage currently consists of two
+ * aggregation tables, one for flow level aggregation and one for user level
+ * aggregation.
+ *
+ * Example table record:
+ *
+ * <pre>
+ * |---------------------------|
+ * | Primary | Column Family|
+ * | key | metrics |
+ * |---------------------------|
+ * | row_key | metricId1: |
+ * | | metricValue1 |
+ * | | @timestamp1 |
+ * | | |
+ * | | metriciD1: |
+ * | | metricValue2 |
+ * | | @timestamp2 |
+ * | | |
+ * | | metricId2: |
+ * | | metricValue1 |
+ * | | @timestamp2 |
+ * | | |
+ * | | |
+ * | | |
+ * | | |
+ * | | |
+ * | | |
+ * | | |
+ * | | |
+ * | | |
+ * | | |
+ * | | |
+ * | | |
+ * | | |
+ * | | |
+ * |---------------------------|
+ * </pre>
+ *
+ * For the flow aggregation table, the primary key contains user, cluster, and
+ * flow id. For user aggregation table,the primary key is user.
+ *
+ * Metrics column family stores all aggregated metrics for each record.
+ */
+@Private
+@Unstable
+public class PhoenixOfflineAggregationWriterImpl
+ extends OfflineAggregationWriter {
+
+ private static final Log LOG
+ = LogFactory.getLog(PhoenixOfflineAggregationWriterImpl.class);
+ private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER
+ = "timeline_cf_placeholder";
+
+ /** Default Phoenix JDBC driver name */
+ private static final String DRIVER_CLASS_NAME
+ = "org.apache.phoenix.jdbc.PhoenixDriver";
+
+ /** Default Phoenix timeline config column family */
+ private static final String METRIC_COLUMN_FAMILY = "m.";
+ /** Default Phoenix timeline info column family */
+ private static final String INFO_COLUMN_FAMILY = "i.";
+ /** Default separator for Phoenix storage */
+ private static final String AGGREGATION_STORAGE_SEPARATOR = ";";
+
+ /** Connection string to the deployed Phoenix cluster */
+ private String connString = null;
+ private Properties connProperties = new Properties();
+
+ public PhoenixOfflineAggregationWriterImpl(Properties prop) {
+ super(PhoenixOfflineAggregationWriterImpl.class.getName());
+ connProperties = PropertiesUtil.deepCopy(prop);
+ }
+
+ public PhoenixOfflineAggregationWriterImpl() {
+ super(PhoenixOfflineAggregationWriterImpl.class.getName());
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ Class.forName(DRIVER_CLASS_NAME);
+ // so check it here and only read in the config if it's not overridden.
+ connString =
+ conf.get(YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR,
+ YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR_DEFAULT);
+ super.init(conf);
+ }
+
+ @Override
+ public TimelineWriteResponse writeAggregatedEntity(
+ TimelineCollectorContext context, TimelineEntities entities,
+ OfflineAggregationInfo info) throws IOException {
+ TimelineWriteResponse response = new TimelineWriteResponse();
+ String sql = "UPSERT INTO " + info.getTableName()
+ + " (" + StringUtils.join(info.getPrimaryKeyList(), ",")
+ + ", created_time, modified_time, metric_names) "
+ + "VALUES ("
+ + StringUtils.repeat("?,", info.getPrimaryKeyList().length)
+ + "?, ?, ?)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("TimelineEntity write SQL: " + sql);
+ }
+
+ try (Connection conn = getConnection();
+ PreparedStatement ps = conn.prepareStatement(sql)) {
+ for (TimelineEntity entity : entities.getEntities()) {
+ HashMap<String, TimelineMetric> formattedMetrics = new HashMap<>();
+ if (entity.getMetrics() != null) {
+ for (TimelineMetric m : entity.getMetrics()) {
+ formattedMetrics.put(m.getId(), m);
+ }
+ }
+ int idx = info.setStringsForPrimaryKey(ps, context, null, 1);
+ ps.setLong(idx++, entity.getCreatedTime());
+ ps.setLong(idx++, entity.getModifiedTime());
+ ps.setString(idx++, StringUtils.join(formattedMetrics.keySet().toArray(),
+ AGGREGATION_STORAGE_SEPARATOR));
+ ps.execute();
+
+ storeEntityVariableLengthFields(entity, formattedMetrics, context, conn,
+ info);
+
+ conn.commit();
+ }
+ } catch (SQLException se) {
+ LOG.error("Failed to add entity to Phoenix " + se.getMessage());
+ throw new IOException(se);
+ } catch (Exception e) {
+ LOG.error("Exception on getting connection: " + e.getMessage());
+ throw new IOException(e);
+ }
+ return response;
+ }
+
+ /**
+ * Create Phoenix tables for offline aggregation storage if the tables do not
+ * exist.
+ *
+ * @throws IOException
+ */
+ public void createPhoenixTables() throws IOException {
+ // Create tables if necessary
+ try (Connection conn = getConnection();
+ Statement stmt = conn.createStatement()) {
+ // Table schema defined as in YARN-3817.
+ String sql = "CREATE TABLE IF NOT EXISTS "
+ + OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME
+ + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
+ + "flow_name VARCHAR NOT NULL, "
+ + "created_time UNSIGNED_LONG, modified_time UNSIGNED_LONG, "
+ + METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY, "
+ + "metric_names VARCHAR, info_keys VARCHAR "
+ + "CONSTRAINT pk PRIMARY KEY("
+ + "user, cluster, flow_name))";
+ stmt.executeUpdate(sql);
+ sql = "CREATE TABLE IF NOT EXISTS "
+ + OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME
+ + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
+ + "created_time UNSIGNED_LONG, modified_time UNSIGNED_LONG, "
+ + METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY, "
+ + "metric_names VARCHAR, info_keys VARCHAR "
+ + "CONSTRAINT pk PRIMARY KEY(user, cluster))";
+ stmt.executeUpdate(sql);
+ conn.commit();
+ } catch (SQLException se) {
+ LOG.error("Failed in init data " + se.getLocalizedMessage());
+ throw new IOException(se);
+ }
+ return;
+ }
+
+ // Utility functions
+ @Private
+ @VisibleForTesting
+ Connection getConnection() throws IOException {
+ Connection conn;
+ try {
+ conn = DriverManager.getConnection(connString, connProperties);
+ conn.setAutoCommit(false);
+ } catch (SQLException se) {
+ LOG.error("Failed to connect to phoenix server! "
+ + se.getLocalizedMessage());
+ throw new IOException(se);
+ }
+ return conn;
+ }
+
+ // WARNING: This method will permanently drop a table!
+ @Private
+ @VisibleForTesting
+ void dropTable(String tableName) throws Exception {
+ try (Connection conn = getConnection();
+ Statement stmt = conn.createStatement()) {
+ String sql = "DROP TABLE " + tableName;
+ stmt.executeUpdate(sql);
+ } catch (SQLException se) {
+ LOG.error("Failed in dropping entity table " + se.getLocalizedMessage());
+ throw se;
+ }
+ }
+
+ private static class DynamicColumns<K> {
+ static final String COLUMN_FAMILY_TYPE_BYTES = " VARBINARY";
+ static final String COLUMN_FAMILY_TYPE_STRING = " VARCHAR";
+ String columnFamilyPrefix;
+ String type;
+ Set<K> columns;
+
+ public DynamicColumns(String columnFamilyPrefix, String type,
+ Set<K> keyValues) {
+ this.columnFamilyPrefix = columnFamilyPrefix;
+ this.columns = keyValues;
+ this.type = type;
+ }
+ }
+
+ private static <K> StringBuilder appendColumnsSQL(
+ StringBuilder colNames, DynamicColumns<K> cfInfo) {
+ // Prepare the sql template by iterating through all keys
+ for (K key : cfInfo.columns) {
+ colNames.append(",").append(cfInfo.columnFamilyPrefix)
+ .append(key.toString()).append(cfInfo.type);
+ }
+ return colNames;
+ }
+
+ private static <K, V> int setValuesForColumnFamily(
+ PreparedStatement ps, Map<K, V> keyValues, int startPos,
+ boolean converToBytes) throws SQLException {
+ int idx = startPos;
+ for (Map.Entry<K, V> entry : keyValues.entrySet()) {
+ V value = entry.getValue();
+ if (value instanceof Collection) {
+ ps.setString(idx++, StringUtils.join(
+ (Collection) value, AGGREGATION_STORAGE_SEPARATOR));
+ } else {
+ if (converToBytes) {
+ try {
+ ps.setBytes(idx++, GenericObjectMapper.write(entry.getValue()));
+ } catch (IOException ie) {
+ LOG.error("Exception in converting values into bytes "
+ + ie.getMessage());
+ throw new SQLException(ie);
+ }
+ } else {
+ ps.setString(idx++, value.toString());
+ }
+ }
+ }
+ return idx;
+ }
+
+ private static <K, V> int setBytesForColumnFamily(
+ PreparedStatement ps, Map<K, V> keyValues, int startPos)
+ throws SQLException {
+ return setValuesForColumnFamily(ps, keyValues, startPos, true);
+ }
+
+ private static <K, V> int setStringsForColumnFamily(
+ PreparedStatement ps, Map<K, V> keyValues, int startPos)
+ throws SQLException {
+ return setValuesForColumnFamily(ps, keyValues, startPos, false);
+ }
+
+ private static void storeEntityVariableLengthFields(TimelineEntity entity,
+ Map<String, TimelineMetric> formattedMetrics,
+ TimelineCollectorContext context, Connection conn,
+ OfflineAggregationInfo aggregationInfo) throws SQLException {
+ int numPlaceholders = 0;
+ StringBuilder columnDefs = new StringBuilder(
+ StringUtils.join(aggregationInfo.getPrimaryKeyList(), ","));
+ if (formattedMetrics != null && formattedMetrics.size() > 0) {
+ appendColumnsSQL(columnDefs, new DynamicColumns<>(
+ METRIC_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES,
+ formattedMetrics.keySet()));
+ numPlaceholders += formattedMetrics.keySet().size();
+ }
+ if (numPlaceholders == 0) {
+ return;
+ }
+ StringBuilder placeholders = new StringBuilder();
+ placeholders.append(
+ StringUtils.repeat("?,", aggregationInfo.getPrimaryKeyList().length));
+ // numPlaceholders >= 1 now
+ placeholders.append("?")
+ .append(StringUtils.repeat(",?", numPlaceholders - 1));
+ String sqlVariableLengthFields = new StringBuilder("UPSERT INTO ")
+ .append(aggregationInfo.getTableName()).append(" (").append(columnDefs)
+ .append(") VALUES(").append(placeholders).append(")").toString();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SQL statement for variable length fields: "
+ + sqlVariableLengthFields);
+ }
+ // Use try with resource statement for the prepared statement
+ try (PreparedStatement psVariableLengthFields =
+ conn.prepareStatement(sqlVariableLengthFields)) {
+ int idx = aggregationInfo.setStringsForPrimaryKey(
+ psVariableLengthFields, context, null, 1);
+ if (formattedMetrics != null && formattedMetrics.size() > 0) {
+ idx = setBytesForColumnFamily(
+ psVariableLengthFields, formattedMetrics, idx);
+ }
+ psVariableLengthFields.execute();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87a00ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
deleted file mode 100644
index 381ff17..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
+++ /dev/null
@@ -1,530 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-@Private
-@Unstable
-public class PhoenixTimelineWriterImpl extends AbstractService
- implements TimelineWriter {
-
- public static final String TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR
- = YarnConfiguration.TIMELINE_SERVICE_PREFIX
- + "writer.phoenix.connectionString";
-
- public static final String TIMELINE_SERVICE_PHEONIX_STORAGE_CONN_STR_DEFAULT
- = "jdbc:phoenix:localhost:2181:/hbase";
-
- private static final Log LOG
- = LogFactory.getLog(PhoenixTimelineWriterImpl.class);
- private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER
- = "timeline_cf_placeholder";
- // These lists are not taking effects in table creations.
- private static final String[] PHOENIX_STORAGE_PK_LIST
- = {"cluster", "user", "flow_name", "flow_version", "flow_run", "app_id",
- "type", "entity_id"};
- private static final String[] TIMELINE_EVENT_EXTRA_PK_LIST =
- {"timestamp", "event_id"};
- private static final String[] TIMELINE_METRIC_EXTRA_PK_LIST =
- {"metric_id"};
- /** Default Phoenix JDBC driver name */
- private static final String DRIVER_CLASS_NAME
- = "org.apache.phoenix.jdbc.PhoenixDriver";
-
- /** Default Phoenix timeline entity table name */
- @VisibleForTesting
- static final String ENTITY_TABLE_NAME = "timeline_entity";
- /** Default Phoenix event table name */
- @VisibleForTesting
- static final String EVENT_TABLE_NAME = "timeline_event";
- /** Default Phoenix metric table name */
- @VisibleForTesting
- static final String METRIC_TABLE_NAME = "metric_singledata";
-
- /** Default Phoenix timeline config column family */
- private static final String CONFIG_COLUMN_FAMILY = "c.";
- /** Default Phoenix timeline info column family */
- private static final String INFO_COLUMN_FAMILY = "i.";
- /** Default Phoenix event info column family */
- private static final String EVENT_INFO_COLUMN_FAMILY = "ei.";
- /** Default Phoenix isRelatedTo column family */
- private static final String IS_RELATED_TO_FAMILY = "ir.";
- /** Default Phoenix relatesTo column family */
- private static final String RELATES_TO_FAMILY = "rt.";
- /** Default separator for Phoenix storage */
- private static final String PHOENIX_STORAGE_SEPARATOR = ";";
-
- /** Connection string to the deployed Phoenix cluster */
- @VisibleForTesting
- String connString = null;
- @VisibleForTesting
- Properties connProperties = new Properties();
-
- PhoenixTimelineWriterImpl() {
- super((PhoenixTimelineWriterImpl.class.getName()));
- }
-
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- // so check it here and only read in the config if it's not overridden.
- connString =
- conf.get(TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR,
- TIMELINE_SERVICE_PHEONIX_STORAGE_CONN_STR_DEFAULT);
- createTables();
- super.init(conf);
- }
-
- @Override
- protected void serviceStop() throws Exception {
- super.serviceStop();
- }
-
- @Override
- public TimelineWriteResponse write(String clusterId, String userId,
- String flowName, String flowVersion, long flowRunId, String appId,
- TimelineEntities entities) throws IOException {
- TimelineWriteResponse response = new TimelineWriteResponse();
- TimelineCollectorContext currContext = new TimelineCollectorContext(
- clusterId, userId, flowName, flowVersion, flowRunId, appId);
- String sql = "UPSERT INTO " + ENTITY_TABLE_NAME
- + " (" + StringUtils.join(PHOENIX_STORAGE_PK_LIST, ",")
- + ", creation_time, modified_time, configs) "
- + "VALUES (" + StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length)
- + "?, ?, ?)";
- if (LOG.isDebugEnabled()) {
- LOG.debug("TimelineEntity write SQL: " + sql);
- }
-
- try (Connection conn = getConnection();
- PreparedStatement ps = conn.prepareStatement(sql)) {
- for (TimelineEntity entity : entities.getEntities()) {
- int idx = setStringsForPrimaryKey(ps, currContext, entity, 1);
- ps.setLong(idx++, entity.getCreatedTime());
- ps.setLong(idx++, entity.getModifiedTime());
- String configKeys = StringUtils.join(
- entity.getConfigs().keySet(), PHOENIX_STORAGE_SEPARATOR);
- ps.setString(idx++, configKeys);
- ps.execute();
-
- storeEntityVariableLengthFields(entity, currContext, conn);
- storeEvents(entity, currContext, conn);
- storeMetrics(entity, currContext, conn);
-
- conn.commit();
- }
- } catch (SQLException se) {
- LOG.error("Failed to add entity to Phoenix " + se.getMessage());
- throw new IOException(se);
- } catch (Exception e) {
- LOG.error("Exception on getting connection: " + e.getMessage());
- throw new IOException(e);
- }
- return response;
- }
-
- /**
- * Aggregates the entity information to the timeline store based on which
- * track this entity is to be rolled up to The tracks along which aggregations
- * are to be done are given by {@link TimelineAggregationTrack}
- *
- * Any errors occurring for individual write request objects will be reported
- * in the response.
- *
- * @param data
- * a {@link TimelineEntity} object
- * a {@link TimelineAggregationTrack} enum value
- * @return a {@link TimelineWriteResponse} object.
- * @throws IOException
- */
- @Override
- public TimelineWriteResponse aggregate(TimelineEntity data,
- TimelineAggregationTrack track) throws IOException {
- return null;
-
- }
-
- @Override
- public void flush() throws IOException {
- // currently no-op
- }
-
- // Utility functions
- @Private
- @VisibleForTesting
- Connection getConnection() throws IOException {
- Connection conn;
- try {
- Class.forName(DRIVER_CLASS_NAME);
- conn = DriverManager.getConnection(connString, connProperties);
- conn.setAutoCommit(false);
- } catch (SQLException se) {
- LOG.error("Failed to connect to phoenix server! "
- + se.getLocalizedMessage());
- throw new IOException(se);
- } catch (ClassNotFoundException e) {
- LOG.error("Class not found! " + e.getLocalizedMessage());
- throw new IOException(e);
- }
- return conn;
- }
-
- private void createTables() throws Exception {
- // Create tables if necessary
- try (Connection conn = getConnection();
- Statement stmt = conn.createStatement()) {
- // Table schema defined as in YARN-3134.
- String sql = "CREATE TABLE IF NOT EXISTS " + ENTITY_TABLE_NAME
- + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
- + "flow_name VARCHAR NOT NULL, flow_version VARCHAR NOT NULL, "
- + "flow_run UNSIGNED_LONG NOT NULL, "
- + "app_id VARCHAR NOT NULL, type VARCHAR NOT NULL, "
- + "entity_id VARCHAR NOT NULL, "
- + "creation_time UNSIGNED_LONG, modified_time UNSIGNED_LONG, "
- + "configs VARCHAR, "
- + CONFIG_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARCHAR, "
- + INFO_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY, "
- + IS_RELATED_TO_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARCHAR, "
- + RELATES_TO_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARCHAR "
- + "CONSTRAINT pk PRIMARY KEY("
- + "user, cluster, flow_name, flow_version, flow_run DESC, app_id, "
- + "type, entity_id))";
- stmt.executeUpdate(sql);
- sql = "CREATE TABLE IF NOT EXISTS " + EVENT_TABLE_NAME
- + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
- + "flow_name VARCHAR NOT NULL, flow_version VARCHAR NOT NULL, "
- + "flow_run UNSIGNED_LONG NOT NULL, "
- + "app_id VARCHAR NOT NULL, type VARCHAR NOT NULL, "
- + "entity_id VARCHAR NOT NULL, "
- + "timestamp UNSIGNED_LONG NOT NULL, event_id VARCHAR NOT NULL, "
- + EVENT_INFO_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY "
- + "CONSTRAINT pk PRIMARY KEY("
- + "user, cluster, flow_name, flow_version, flow_run DESC, app_id, "
- + "type, entity_id, timestamp DESC, event_id))";
- stmt.executeUpdate(sql);
- sql = "CREATE TABLE IF NOT EXISTS " + METRIC_TABLE_NAME
- + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
- + "flow_name VARCHAR NOT NULL, flow_version VARCHAR NOT NULL, "
- + "flow_run UNSIGNED_LONG NOT NULL, "
- + "app_id VARCHAR NOT NULL, type VARCHAR NOT NULL, "
- + "entity_id VARCHAR NOT NULL, "
- + "metric_id VARCHAR NOT NULL, "
- + "singledata VARBINARY, "
- + "time UNSIGNED_LONG "
- + "CONSTRAINT pk PRIMARY KEY("
- + "user, cluster, flow_name, flow_version, flow_run DESC, app_id, "
- + "type, entity_id, metric_id))";
- stmt.executeUpdate(sql);
- conn.commit();
- } catch (SQLException se) {
- LOG.error("Failed in init data " + se.getLocalizedMessage());
- throw se;
- }
- return;
- }
-
- private static class DynamicColumns<K> {
- static final String COLUMN_FAMILY_TYPE_BYTES = " VARBINARY";
- static final String COLUMN_FAMILY_TYPE_STRING = " VARCHAR";
- String columnFamilyPrefix;
- String type;
- Set<K> columns;
-
- public DynamicColumns(String columnFamilyPrefix, String type,
- Set<K> keyValues) {
- this.columnFamilyPrefix = columnFamilyPrefix;
- this.columns = keyValues;
- this.type = type;
- }
- }
-
- private static <K> StringBuilder appendColumnsSQL(
- StringBuilder colNames, DynamicColumns<K> cfInfo) {
- // Prepare the sql template by iterating through all keys
- for (K key : cfInfo.columns) {
- colNames.append(",").append(cfInfo.columnFamilyPrefix)
- .append(key.toString()).append(cfInfo.type);
- }
- return colNames;
- }
-
- private static <K, V> int setValuesForColumnFamily(
- PreparedStatement ps, Map<K, V> keyValues, int startPos,
- boolean converToBytes) throws SQLException {
- int idx = startPos;
- for (Map.Entry<K, V> entry : keyValues.entrySet()) {
- V value = entry.getValue();
- if (value instanceof Collection) {
- ps.setString(idx++, StringUtils.join(
- (Collection) value, PHOENIX_STORAGE_SEPARATOR));
- } else {
- if (converToBytes) {
- try {
- ps.setBytes(idx++, GenericObjectMapper.write(entry.getValue()));
- } catch (IOException ie) {
- LOG.error("Exception in converting values into bytes "
- + ie.getMessage());
- throw new SQLException(ie);
- }
- } else {
- ps.setString(idx++, value.toString());
- }
- }
- }
- return idx;
- }
-
- private static <K, V> int setBytesForColumnFamily(
- PreparedStatement ps, Map<K, V> keyValues, int startPos)
- throws SQLException {
- return setValuesForColumnFamily(ps, keyValues, startPos, true);
- }
-
- private static <K, V> int setStringsForColumnFamily(
- PreparedStatement ps, Map<K, V> keyValues, int startPos)
- throws SQLException {
- return setValuesForColumnFamily(ps, keyValues, startPos, false);
- }
-
- private static int setStringsForPrimaryKey(PreparedStatement ps,
- TimelineCollectorContext context, TimelineEntity entity, int startPos)
- throws SQLException {
- int idx = startPos;
- ps.setString(idx++, context.getClusterId());
- ps.setString(idx++, context.getUserId());
- ps.setString(idx++,
- context.getFlowName());
- ps.setString(idx++, context.getFlowVersion());
- ps.setLong(idx++, context.getFlowRunId());
- ps.setString(idx++, context.getAppId());
- ps.setString(idx++, entity.getType());
- ps.setString(idx++, entity.getId());
- return idx;
- }
-
- private static void storeEntityVariableLengthFields(TimelineEntity entity,
- TimelineCollectorContext context, Connection conn) throws SQLException {
- int numPlaceholders = 0;
- StringBuilder columnDefs = new StringBuilder(
- StringUtils.join(PHOENIX_STORAGE_PK_LIST, ","));
- if (entity.getConfigs() != null) {
- Set<String> keySet = entity.getConfigs().keySet();
- appendColumnsSQL(columnDefs, new DynamicColumns<>(
- CONFIG_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_STRING,
- keySet));
- numPlaceholders += keySet.size();
- }
- if (entity.getInfo() != null) {
- Set<String> keySet = entity.getInfo().keySet();
- appendColumnsSQL(columnDefs, new DynamicColumns<>(
- INFO_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES,
- keySet));
- numPlaceholders += keySet.size();
- }
- if (entity.getIsRelatedToEntities() != null) {
- Set<String> keySet = entity.getIsRelatedToEntities().keySet();
- appendColumnsSQL(columnDefs, new DynamicColumns<>(
- IS_RELATED_TO_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_STRING,
- keySet));
- numPlaceholders += keySet.size();
- }
- if (entity.getRelatesToEntities() != null) {
- Set<String> keySet = entity.getRelatesToEntities().keySet();
- appendColumnsSQL(columnDefs, new DynamicColumns<>(
- RELATES_TO_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_STRING,
- keySet));
- numPlaceholders += keySet.size();
- }
- if (numPlaceholders == 0) {
- return;
- }
- StringBuilder placeholders = new StringBuilder();
- placeholders.append(
- StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length));
- // numPlaceholders >= 1 now
- placeholders.append("?")
- .append(StringUtils.repeat(",?", numPlaceholders - 1));
- String sqlVariableLengthFields = new StringBuilder("UPSERT INTO ")
- .append(ENTITY_TABLE_NAME).append(" (").append(columnDefs)
- .append(") VALUES(").append(placeholders).append(")").toString();
- if (LOG.isDebugEnabled()) {
- LOG.debug("SQL statement for variable length fields: "
- + sqlVariableLengthFields);
- }
- // Use try with resource statement for the prepared statement
- try (PreparedStatement psVariableLengthFields =
- conn.prepareStatement(sqlVariableLengthFields)) {
- int idx = setStringsForPrimaryKey(
- psVariableLengthFields, context, entity, 1);
- if (entity.getConfigs() != null) {
- idx = setStringsForColumnFamily(
- psVariableLengthFields, entity.getConfigs(), idx);
- }
- if (entity.getInfo() != null) {
- idx = setBytesForColumnFamily(
- psVariableLengthFields, entity.getInfo(), idx);
- }
- if (entity.getIsRelatedToEntities() != null) {
- idx = setStringsForColumnFamily(
- psVariableLengthFields, entity.getIsRelatedToEntities(), idx);
- }
- if (entity.getRelatesToEntities() != null) {
- idx = setStringsForColumnFamily(
- psVariableLengthFields, entity.getRelatesToEntities(), idx);
- }
- psVariableLengthFields.execute();
- }
- }
-
- private static void storeMetrics(TimelineEntity entity,
- TimelineCollectorContext context, Connection conn) throws SQLException {
- if (entity.getMetrics() == null) {
- return;
- }
- Set<TimelineMetric> metrics = entity.getMetrics();
- for (TimelineMetric metric : metrics) {
- StringBuilder sqlColumns = new StringBuilder(
- StringUtils.join(PHOENIX_STORAGE_PK_LIST, ","));
- sqlColumns.append(",")
- .append(StringUtils.join(TIMELINE_METRIC_EXTRA_PK_LIST, ","));
- sqlColumns.append(",").append("singledata, time");
- StringBuilder placeholders = new StringBuilder();
- placeholders.append(
- StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length))
- .append(StringUtils.repeat("?,", TIMELINE_METRIC_EXTRA_PK_LIST.length));
- placeholders.append("?, ?");
- String sqlMetric = new StringBuilder("UPSERT INTO ")
- .append(METRIC_TABLE_NAME).append(" (").append(sqlColumns)
- .append(") VALUES(").append(placeholders).append(")").toString();
- if (LOG.isDebugEnabled()) {
- LOG.debug("SQL statement for metric: " + sqlMetric);
- }
- try (PreparedStatement psMetrics = conn.prepareStatement(sqlMetric)) {
- if (metric.getType().equals(TimelineMetric.Type.TIME_SERIES)) {
- LOG.warn("The incoming timeline metric contains time series data, "
- + "which is currently not supported by Phoenix storage. "
- + "Time series will be truncated. ");
- }
- int idx = setStringsForPrimaryKey(psMetrics, context, entity, 1);
- psMetrics.setString(idx++, metric.getId());
- Iterator<Map.Entry<Long, Number>> currNumIter =
- metric.getValues().entrySet().iterator();
- if (currNumIter.hasNext()) {
- // TODO: support time series storage
- Map.Entry<Long, Number> currEntry = currNumIter.next();
- psMetrics.setBytes(idx++,
- GenericObjectMapper.write(currEntry.getValue()));
- psMetrics.setLong(idx++, currEntry.getKey());
- } else {
- psMetrics.setBytes(idx++, GenericObjectMapper.write(null));
- LOG.warn("The incoming metric contains an empty value set. ");
- }
- psMetrics.execute();
- } catch (IOException ie) {
- LOG.error("Exception on converting single data to bytes: "
- + ie.getMessage());
- throw new SQLException(ie);
- }
- }
- }
-
- private static void storeEvents(TimelineEntity entity,
- TimelineCollectorContext context, Connection conn) throws SQLException {
- if (entity.getEvents() == null) {
- return;
- }
- Set<TimelineEvent> events = entity.getEvents();
- for (TimelineEvent event : events) {
- // We need this number to check if the incoming event's info field is empty
- int numPlaceholders = 0;
- StringBuilder sqlColumns = new StringBuilder(
- StringUtils.join(PHOENIX_STORAGE_PK_LIST, ","));
- sqlColumns.append(",")
- .append(StringUtils.join(TIMELINE_EVENT_EXTRA_PK_LIST, ","));
- appendColumnsSQL(sqlColumns, new DynamicColumns<>(
- EVENT_INFO_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES,
- event.getInfo().keySet()));
- numPlaceholders += event.getInfo().keySet().size();
- if (numPlaceholders == 0) {
- continue;
- }
- StringBuilder placeholders = new StringBuilder();
- placeholders.append(
- StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length))
- .append(StringUtils.repeat("?,", TIMELINE_EVENT_EXTRA_PK_LIST.length));
- // numPlaceholders >= 1 now
- placeholders.append("?")
- .append(StringUtils.repeat(",?", numPlaceholders - 1));
- String sqlEvents = new StringBuilder("UPSERT INTO ")
- .append(EVENT_TABLE_NAME).append(" (").append(sqlColumns)
- .append(") VALUES(").append(placeholders).append(")").toString();
- if (LOG.isDebugEnabled()) {
- LOG.debug("SQL statement for events: " + sqlEvents);
- }
- try (PreparedStatement psEvent = conn.prepareStatement(sqlEvents)) {
- int idx = setStringsForPrimaryKey(psEvent, context, entity, 1);
- psEvent.setLong(idx++, event.getTimestamp());
- psEvent.setString(idx++, event.getId());
- setBytesForColumnFamily(psEvent, event.getInfo(), idx);
- psEvent.execute();
- }
- }
- }
-
- // WARNING: This method will permanently drop a table!
- @Private
- @VisibleForTesting
- void dropTable(String tableName) throws Exception {
- try (Connection conn = getConnection();
- Statement stmt = conn.createStatement()) {
- String sql = "DROP TABLE " + tableName;
- stmt.executeUpdate(sql);
- } catch (SQLException se) {
- LOG.error("Failed in dropping entity table " + se.getLocalizedMessage());
- throw se;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87a00ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
index 3a22ed6..5120856 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -51,6 +53,7 @@ public class TimelineSchemaCreator {
final static String NAME = TimelineSchemaCreator.class.getSimpleName();
private static final Log LOG = LogFactory.getLog(TimelineSchemaCreator.class);
+ private static final String PHOENIX_OPTION_SHORT = "p";
public static void main(String[] args) throws Exception {
@@ -83,7 +86,41 @@ public class TimelineSchemaCreator {
hbaseConf.set(ApplicationTable.TABLE_NAME_CONF_NAME,
applicationTableName);
}
- createAllTables(hbaseConf);
+
+ List<Exception> exceptions = new ArrayList<>();
+ try {
+ createAllTables(hbaseConf);
+ LOG.info("Successfully created HBase schema. ");
+ } catch (IOException e) {
+ LOG.error("Error in creating hbase tables: " + e.getMessage());
+ exceptions.add(e);
+ }
+
+ // Create Phoenix data schema if needed
+ if (commandLine.hasOption(PHOENIX_OPTION_SHORT)) {
+ Configuration phoenixConf = new Configuration();
+ try {
+ PhoenixOfflineAggregationWriterImpl phoenixWriter =
+ new PhoenixOfflineAggregationWriterImpl();
+ phoenixWriter.init(phoenixConf);
+ phoenixWriter.start();
+ phoenixWriter.createPhoenixTables();
+ phoenixWriter.stop();
+ LOG.info("Successfully created Phoenix offline aggregation schema. ");
+ } catch (IOException e) {
+ LOG.error("Error in creating phoenix tables: " + e.getMessage());
+ exceptions.add(e);
+ }
+ }
+ if (exceptions.size() > 0) {
+ LOG.warn("Schema creation finished with the following exceptions");
+ for (Exception e : exceptions) {
+ LOG.warn(e.getMessage());
+ }
+ System.exit(-1);
+ } else {
+ LOG.info("Schema creation finished successfully");
+ }
}
/**
@@ -115,6 +152,12 @@ public class TimelineSchemaCreator {
o.setRequired(false);
options.addOption(o);
+ o = new Option(PHOENIX_OPTION_SHORT, "usePhoenix", false,
+ "create Phoenix offline aggregation tables");
+ // No need to set arg name since we do not need an argument here
+ o.setRequired(false);
+ options.addOption(o);
+
CommandLineParser parser = new PosixParser();
CommandLine commandLine = null;
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87a00ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/OfflineAggregationInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/OfflineAggregationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/OfflineAggregationInfo.java
new file mode 100644
index 0000000..16c03a3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/OfflineAggregationInfo.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Class to carry the offline aggregation information for storage level
+ * implementations. There are currently two predefined aggregation info
+ * instances that represent flow and user level offline aggregations. Depend on
+ * its implementation, a storage class may use an OfflineAggregationInfo object
+ * to decide behaviors dynamically.
+ */
+public final class OfflineAggregationInfo {
+ /**
+ * Default flow level aggregation table name
+ */
+ @VisibleForTesting
+ public static final String FLOW_AGGREGATION_TABLE_NAME
+ = "yarn_timeline_flow_aggregation";
+ /**
+ * Default user level aggregation table name
+ */
+ public static final String USER_AGGREGATION_TABLE_NAME
+ = "yarn_timeline_user_aggregation";
+
+ // These lists are not taking effects in table creations.
+ private static final String[] FLOW_AGGREGATION_PK_LIST =
+ { "user", "cluster", "flow_name" };
+ private static final String[] USER_AGGREGATION_PK_LIST = { "user", "cluster"};
+
+ private final String tableName;
+ private final String[] primaryKeyList;
+ private final PrimaryKeyStringSetter primaryKeyStringSetter;
+
+ private OfflineAggregationInfo(String table, String[] pkList,
+ PrimaryKeyStringSetter formatter) {
+ tableName = table;
+ primaryKeyList = pkList;
+ primaryKeyStringSetter = formatter;
+ }
+
+ private interface PrimaryKeyStringSetter {
+ int setValues(PreparedStatement ps, TimelineCollectorContext context,
+ String[] extraInfo, int startPos) throws SQLException;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public String[] getPrimaryKeyList() {
+ return primaryKeyList.clone();
+ }
+
+ public int setStringsForPrimaryKey(PreparedStatement ps,
+ TimelineCollectorContext context, String[] extraInfo, int startPos)
+ throws SQLException {
+ return primaryKeyStringSetter.setValues(ps, context, extraInfo, startPos);
+ }
+
+ public static final OfflineAggregationInfo FLOW_AGGREGATION =
+ new OfflineAggregationInfo(FLOW_AGGREGATION_TABLE_NAME,
+ FLOW_AGGREGATION_PK_LIST, new PrimaryKeyStringSetter() {
+ @Override
+ public int setValues(PreparedStatement ps,
+ TimelineCollectorContext context, String[] extraInfo, int startPos)
+ throws SQLException {
+ int idx = startPos;
+ ps.setString(idx++, context.getUserId());
+ ps.setString(idx++, context.getClusterId());
+ ps.setString(idx++, context.getFlowName());
+ return idx;
+ }
+ });
+
+ public static final OfflineAggregationInfo USER_AGGREGATION =
+ new OfflineAggregationInfo(USER_AGGREGATION_TABLE_NAME,
+ USER_AGGREGATION_PK_LIST, new PrimaryKeyStringSetter() {
+ @Override
+ public int setValues(PreparedStatement ps,
+ TimelineCollectorContext context, String[] extraInfo, int startPos)
+ throws SQLException {
+ int idx = startPos;
+ ps.setString(idx++, context.getUserId());
+ ps.setString(idx++, context.getClusterId());
+ return idx;
+ }
+ });
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87a00ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java
new file mode 100644
index 0000000..de66a17
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+
+public class TestPhoenixOfflineAggregationWriterImpl extends BaseTest {
+ private static PhoenixOfflineAggregationWriterImpl storage;
+ private static final int BATCH_SIZE = 3;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ storage = setupPhoenixClusterAndWriterForTest(conf);
+ }
+
+ @Test(timeout = 90000)
+ public void testFlowLevelAggregationStorage() throws Exception {
+ testAggregator(OfflineAggregationInfo.FLOW_AGGREGATION);
+ }
+
+ @Test(timeout = 90000)
+ public void testUserLevelAggregationStorage() throws Exception {
+ testAggregator(OfflineAggregationInfo.USER_AGGREGATION);
+ }
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ storage.dropTable(OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME);
+ storage.dropTable(OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME);
+ tearDownMiniCluster();
+ }
+
+ private static PhoenixOfflineAggregationWriterImpl
+ setupPhoenixClusterAndWriterForTest(YarnConfiguration conf)
+ throws Exception{
+ Map<String, String> props = new HashMap<>();
+ // Must update config before starting server
+ props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
+ Boolean.FALSE.toString());
+ props.put("java.security.krb5.realm", "");
+ props.put("java.security.krb5.kdc", "");
+ props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER,
+ Boolean.FALSE.toString());
+ props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000));
+ props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100));
+ // Make a small batch size to test multiple calls to reserve sequences
+ props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB,
+ Long.toString(BATCH_SIZE));
+ // Must update config before starting server
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+
+ // Change connection settings for test
+ conf.set(
+ YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR,
+ getUrl());
+ PhoenixOfflineAggregationWriterImpl
+ myWriter = new PhoenixOfflineAggregationWriterImpl(TEST_PROPERTIES);
+ myWriter.init(conf);
+ myWriter.start();
+ myWriter.createPhoenixTables();
+ return myWriter;
+ }
+
+ private static TimelineEntity getTestAggregationTimelineEntity() {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "hello1";
+ String type = "testAggregationType";
+ entity.setId(id);
+ entity.setType(type);
+ entity.setCreatedTime(1425016501000L);
+ entity.setModifiedTime(1425016502000L);
+
+ TimelineMetric metric = new TimelineMetric();
+ metric.setId("HDFS_BYTES_READ");
+ metric.addValue(1425016501100L, 8000);
+ entity.addMetric(metric);
+
+ return entity;
+ }
+
+ private void testAggregator(OfflineAggregationInfo aggregationInfo)
+ throws Exception {
+ // Set up a list of timeline entities and write them back to Phoenix
+ int numEntity = 1;
+ TimelineEntities te = new TimelineEntities();
+ te.addEntity(getTestAggregationTimelineEntity());
+ TimelineCollectorContext context = new TimelineCollectorContext("cluster_1",
+ "user1", "testFlow", null, 0, null);
+ storage.writeAggregatedEntity(context, te,
+ aggregationInfo);
+
+ // Verify if we're storing all entities
+ String[] primaryKeyList = aggregationInfo.getPrimaryKeyList();
+ String sql = "SELECT COUNT(" + primaryKeyList[primaryKeyList.length - 1]
+ +") FROM " + aggregationInfo.getTableName();
+ verifySQLWithCount(sql, numEntity, "Number of entities should be ");
+ // Check metric
+ sql = "SELECT COUNT(m.HDFS_BYTES_READ) FROM "
+ + aggregationInfo.getTableName() + "(m.HDFS_BYTES_READ VARBINARY) ";
+ verifySQLWithCount(sql, numEntity,
+ "Number of entities with info should be ");
+ }
+
+
+ private void verifySQLWithCount(String sql, int targetCount, String message)
+ throws Exception {
+ try (
+ Statement stmt =
+ storage.getConnection().createStatement();
+ ResultSet rs = stmt.executeQuery(sql)) {
+ assertTrue("Result set empty on statement " + sql, rs.next());
+ assertNotNull("Fail to execute query " + sql, rs);
+ assertEquals(message + " " + targetCount, targetCount, rs.getInt(1));
+ } catch (SQLException se) {
+ fail("SQL exception on query: " + sql
+ + " With exception message: " + se.getLocalizedMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87a00ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java
deleted file mode 100644
index dece83d..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.hadoop.hbase.IntegrationTestingUtility;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
-import org.apache.phoenix.query.BaseTest;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-
-public class TestPhoenixTimelineWriterImpl extends BaseTest {
- private static PhoenixTimelineWriterImpl writer;
- private static final int BATCH_SIZE = 3;
-
- @BeforeClass
- public static void setup() throws Exception {
- YarnConfiguration conf = new YarnConfiguration();
- writer = setupPhoenixClusterAndWriterForTest(conf);
- }
-
- @Test(timeout = 90000)
- public void testPhoenixWriterBasic() throws Exception {
- // Set up a list of timeline entities and write them back to Phoenix
- int numEntity = 12;
- TimelineEntities te =
- TestTimelineWriterImpl.getStandardTestTimelineEntities(numEntity);
- writer.write("cluster_1", "user1", "testFlow", "version1", 1l, "app_test_1", te);
- // Verify if we're storing all entities
- String sql = "SELECT COUNT(entity_id) FROM "
- + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME;
- verifySQLWithCount(sql, numEntity, "Number of entities should be ");
- // Check config (half of all entities)
- sql = "SELECT COUNT(c.config) FROM "
- + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME + "(c.config VARCHAR) ";
- verifySQLWithCount(sql, (numEntity / 2),
- "Number of entities with config should be ");
- // Check info (half of all entities)
- sql = "SELECT COUNT(i.info1) FROM "
- + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME + "(i.info1 VARBINARY) ";
- verifySQLWithCount(sql, (numEntity / 2),
- "Number of entities with info should be ");
- // Check config and info (a quarter of all entities)
- sql = "SELECT COUNT(entity_id) FROM "
- + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME
- + "(c.config VARCHAR, i.info1 VARBINARY) "
- + "WHERE c.config IS NOT NULL AND i.info1 IS NOT NULL";
- verifySQLWithCount(sql, (numEntity / 4),
- "Number of entities with both config and info should be ");
- // Check relatesToEntities and isRelatedToEntities
- sql = "SELECT COUNT(entity_id) FROM "
- + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME
- + "(rt.testType VARCHAR, ir.testType VARCHAR) "
- + "WHERE rt.testType IS NOT NULL AND ir.testType IS NOT NULL";
- verifySQLWithCount(sql, numEntity - 2,
- "Number of entities with both relatesTo and isRelatedTo should be ");
- // Check event
- sql = "SELECT COUNT(entity_id) FROM "
- + PhoenixTimelineWriterImpl.EVENT_TABLE_NAME;
- verifySQLWithCount(sql, (numEntity / 4), "Number of events should be ");
- // Check metrics
- sql = "SELECT COUNT(entity_id) FROM "
- + PhoenixTimelineWriterImpl.METRIC_TABLE_NAME;
- verifySQLWithCount(sql, (numEntity / 4), "Number of events should be ");
- }
-
- @AfterClass
- public static void cleanup() throws Exception {
- writer.dropTable(PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME);
- writer.dropTable(PhoenixTimelineWriterImpl.EVENT_TABLE_NAME);
- writer.dropTable(PhoenixTimelineWriterImpl.METRIC_TABLE_NAME);
- writer.serviceStop();
- tearDownMiniCluster();
- }
-
- private static PhoenixTimelineWriterImpl setupPhoenixClusterAndWriterForTest(
- YarnConfiguration conf) throws Exception{
- Map<String, String> props = new HashMap<>();
- // Must update config before starting server
- props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
- Boolean.FALSE.toString());
- props.put("java.security.krb5.realm", "");
- props.put("java.security.krb5.kdc", "");
- props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER,
- Boolean.FALSE.toString());
- props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000));
- props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100));
- // Make a small batch size to test multiple calls to reserve sequences
- props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB,
- Long.toString(BATCH_SIZE));
- // Must update config before starting server
- setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-
- PhoenixTimelineWriterImpl myWriter = new PhoenixTimelineWriterImpl();
- // Change connection settings for test
- conf.set(
- PhoenixTimelineWriterImpl.TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR,
- getUrl());
- myWriter.connProperties = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- myWriter.serviceInit(conf);
- return myWriter;
- }
-
- private void verifySQLWithCount(String sql, int targetCount, String message)
- throws Exception {
- try (
- Statement stmt =
- writer.getConnection().createStatement();
- ResultSet rs = stmt.executeQuery(sql)) {
- assertTrue("Result set empty on statement " + sql, rs.next());
- assertNotNull("Fail to execute query " + sql, rs);
- assertEquals(message + " " + targetCount, targetCount, rs.getInt(1));
- } catch (SQLException se) {
- fail("SQL exception on query: " + sql
- + " With exception message: " + se.getLocalizedMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87a00ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java
deleted file mode 100644
index 7a7afc0..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-
-public class TestTimelineWriterImpl {
- static TimelineEntities getStandardTestTimelineEntities(int listSize) {
- TimelineEntities te = new TimelineEntities();
- for (int i = 0; i < listSize; i++) {
- TimelineEntity entity = new TimelineEntity();
- String id = "hello" + i;
- String type = "testType";
- entity.setId(id);
- entity.setType(type);
- entity.setCreatedTime(1425016501000L + i);
- entity.setModifiedTime(1425016502000L + i);
- if (i > 0) {
- entity.addRelatesToEntity(type, "hello" + i);
- entity.addRelatesToEntity(type, "hello" + (i - 1));
- }
- if (i < listSize - 1) {
- entity.addIsRelatedToEntity(type, "hello" + i);
- entity.addIsRelatedToEntity(type, "hello" + (i + 1));
- }
- int category = i % 4;
- switch (category) {
- case 0:
- entity.addConfig("config", "config" + i);
- // Fall through deliberately
- case 1:
- entity.addInfo("info1", new Integer(i));
- entity.addInfo("info2", "helloworld");
- // Fall through deliberately
- case 2:
- break;
- case 3:
- entity.addConfig("config", "config" + i);
- TimelineEvent event = new TimelineEvent();
- event.setId("test event");
- event.setTimestamp(1425016501100L + i);
- event.addInfo("test_info", "content for " + entity.getId());
- event.addInfo("test_info1", new Integer(i));
- entity.addEvent(event);
- TimelineMetric metric = new TimelineMetric();
- metric.setId("HDFS_BYTES_READ");
- metric.addValue(1425016501100L + i, 8000 + i);
- entity.addMetric(metric);
- break;
- }
- te.addEntity(entity);
- }
- return te;
- }
-}