You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2016/01/20 10:13:22 UTC

[05/50] [abbrv] hadoop git commit: YARN-3904. Refactor timelineservice.storage to add support to online and offline aggregation writers (Li Lu via sjlee)

YARN-3904. Refactor timelineservice.storage to add support to online and offline aggregation writers (Li Lu via sjlee)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a87a00ee
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a87a00ee
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a87a00ee

Branch: refs/heads/feature-YARN-2928
Commit: a87a00ee68ee929b8a297c97dad0999e800a9f59
Parents: 557fd5e
Author: Sangjin Lee <sj...@apache.org>
Authored: Mon Aug 17 16:48:58 2015 -0700
Committer: Li Lu <gt...@apache.org>
Committed: Tue Jan 19 17:37:47 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../dev-support/findbugs-exclude.xml            |   7 +-
 .../hadoop/yarn/conf/YarnConfiguration.java     |  10 +
 .../storage/OfflineAggregationWriter.java       |  66 +++
 .../PhoenixOfflineAggregationWriterImpl.java    | 356 +++++++++++++
 .../storage/PhoenixTimelineWriterImpl.java      | 530 -------------------
 .../storage/TimelineSchemaCreator.java          |  45 +-
 .../storage/common/OfflineAggregationInfo.java  | 110 ++++
 ...TestPhoenixOfflineAggregationWriterImpl.java | 162 ++++++
 .../storage/TestPhoenixTimelineWriterImpl.java  | 152 ------
 .../storage/TestTimelineWriterImpl.java         |  74 ---
 11 files changed, 754 insertions(+), 761 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87a00ee/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index c3a8172..42da97b 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -88,6 +88,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3906. Split the application table from the entity table. (Sangjin Lee 
     via junping_du)
 
+    YARN-3904. Refactor timelineservice.storage to add support to online and
+    offline aggregation writers (Li Lu via sjlee)
+
   IMPROVEMENTS
 
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87a00ee/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 0dcdd15..d36f245 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -499,13 +499,12 @@
   <!-- Ignore SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING warnings for Timeline Phoenix storage. -->
   <!-- Since we're using dynamic columns, we have to generate SQL statements dynamically -->
   <Match>
-    <Class name="org.apache.hadoop.yarn.server.timelineservice.storage.PhoenixTimelineWriterImpl" />
+    <Class name="org.apache.hadoop.yarn.server.timelineservice.storage.PhoenixOfflineAggregationWriterImpl" />
     <Or>
       <Method name="storeEntityVariableLengthFields" />
-      <Method name="storeEvents" />
-      <Method name="storeMetrics" />
-      <Method name="write" />
+      <Method name="writeAggregatedEntity" />
     </Or>
+    <Bug pattern="SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING" />
   </Match>
   
   <!-- Following fields are used in ErrorsAndWarningsBlock, which is not a part of analysis of findbugs -->

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87a00ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index b8281b0..2946240 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2005,6 +2005,16 @@ public class YarnConfiguration extends Configuration {
   public static final long    DEFAULT_TIMELINE_DELEGATION_TOKEN_MAX_LIFETIME =
       7*24*60*60*1000; // 7 days
 
+  // Timeline service v2 offlien aggregation related keys
+  public static final String TIMELINE_OFFLINE_AGGREGATION_PREFIX =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + "aggregation.offline.";
+  public static final String PHOENIX_OFFLINE_STORAGE_CONN_STR
+      = TIMELINE_OFFLINE_AGGREGATION_PREFIX
+          + "phoenix.connectionString";
+
+  public static final String PHOENIX_OFFLINE_STORAGE_CONN_STR_DEFAULT
+      = "jdbc:phoenix:localhost:2181:/hbase";
+
   // ///////////////////////////////
   // Shared Cache Configs
   // ///////////////////////////////

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87a00ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java
new file mode 100644
index 0000000..e1219e0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo;
+
+import java.io.IOException;
+
+/**
+ * YARN timeline service v2 offline aggregation storage interface
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class OfflineAggregationWriter extends AbstractService {
+
+  /**
+   * Construct the offline writer.
+   *
+   * @param name service name
+   */
+  public OfflineAggregationWriter(String name) {
+    super(name);
+  }
+
+  /**
+   * Persist aggregated timeline entities to the offline store based on which
+   * track this entity is to be rolled up to. The tracks along which aggregations
+   * are to be done are given by {@link OfflineAggregationInfo}.
+   *
+   * @param context a {@link TimelineCollectorContext} object that describes the
+   *                context information of the aggregated data. Depends on the
+   *                type of the aggregation, some fields of this context maybe
+   *                empty or null.
+   * @param entities {@link TimelineEntities} to be persisted.
+   * @param info an {@link OfflineAggregationInfo} object that describes the
+   *             detail of the aggregation. Current supported option is
+   *             {@link OfflineAggregationInfo#FLOW_AGGREGATION}.
+   * @return a {@link TimelineWriteResponse} object.
+   * @throws IOException
+   */
+  abstract TimelineWriteResponse writeAggregatedEntity(
+      TimelineCollectorContext context,
+      TimelineEntities entities, OfflineAggregationInfo info) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87a00ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java
new file mode 100644
index 0000000..4c1352c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.phoenix.util.PropertiesUtil;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Offline aggregation Phoenix storage. This storage currently consists of two
+ * aggregation tables, one for flow level aggregation and one for user level
+ * aggregation.
+ *
+ * Example table record:
+ *
+ * <pre>
+ * |---------------------------|
+ * |  Primary   | Column Family|
+ * |  key       | metrics      |
+ * |---------------------------|
+ * | row_key    | metricId1:   |
+ * |            | metricValue1 |
+ * |            | @timestamp1  |
+ * |            |              |
+ * |            | metriciD1:   |
+ * |            | metricValue2 |
+ * |            | @timestamp2  |
+ * |            |              |
+ * |            | metricId2:   |
+ * |            | metricValue1 |
+ * |            | @timestamp2  |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |---------------------------|
+ * </pre>
+ *
+ * For the flow aggregation table, the primary key contains user, cluster, and
+ * flow id. For user aggregation table,the primary key is user.
+ *
+ * Metrics column family stores all aggregated metrics for each record.
+ */
+@Private
+@Unstable
+public class PhoenixOfflineAggregationWriterImpl
+    extends OfflineAggregationWriter {
+
+  private static final Log LOG
+      = LogFactory.getLog(PhoenixOfflineAggregationWriterImpl.class);
+  private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER
+      = "timeline_cf_placeholder";
+
+  /** Default Phoenix JDBC driver name */
+  private static final String DRIVER_CLASS_NAME
+      = "org.apache.phoenix.jdbc.PhoenixDriver";
+
+  /** Default Phoenix timeline config column family */
+  private static final String METRIC_COLUMN_FAMILY = "m.";
+  /** Default Phoenix timeline info column family */
+  private static final String INFO_COLUMN_FAMILY = "i.";
+  /** Default separator for Phoenix storage */
+  private static final String AGGREGATION_STORAGE_SEPARATOR = ";";
+
+  /** Connection string to the deployed Phoenix cluster */
+  private String connString = null;
+  private Properties connProperties = new Properties();
+
+  public PhoenixOfflineAggregationWriterImpl(Properties prop) {
+    super(PhoenixOfflineAggregationWriterImpl.class.getName());
+    connProperties = PropertiesUtil.deepCopy(prop);
+  }
+
+  public PhoenixOfflineAggregationWriterImpl() {
+    super(PhoenixOfflineAggregationWriterImpl.class.getName());
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    Class.forName(DRIVER_CLASS_NAME);
+    // so check it here and only read in the config if it's not overridden.
+    connString =
+        conf.get(YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR,
+            YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR_DEFAULT);
+    super.init(conf);
+  }
+
+  @Override
+  public TimelineWriteResponse writeAggregatedEntity(
+      TimelineCollectorContext context, TimelineEntities entities,
+      OfflineAggregationInfo info) throws IOException {
+    TimelineWriteResponse response = new TimelineWriteResponse();
+    String sql = "UPSERT INTO " + info.getTableName()
+        + " (" + StringUtils.join(info.getPrimaryKeyList(), ",")
+        + ", created_time, modified_time, metric_names) "
+        + "VALUES ("
+        + StringUtils.repeat("?,", info.getPrimaryKeyList().length)
+        + "?, ?, ?)";
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("TimelineEntity write SQL: " + sql);
+    }
+
+    try (Connection conn = getConnection();
+        PreparedStatement ps = conn.prepareStatement(sql)) {
+      for (TimelineEntity entity : entities.getEntities()) {
+        HashMap<String, TimelineMetric> formattedMetrics = new HashMap<>();
+        if (entity.getMetrics() != null) {
+          for (TimelineMetric m : entity.getMetrics()) {
+            formattedMetrics.put(m.getId(), m);
+          }
+        }
+        int idx = info.setStringsForPrimaryKey(ps, context, null, 1);
+        ps.setLong(idx++, entity.getCreatedTime());
+        ps.setLong(idx++, entity.getModifiedTime());
+        ps.setString(idx++, StringUtils.join(formattedMetrics.keySet().toArray(),
+            AGGREGATION_STORAGE_SEPARATOR));
+        ps.execute();
+
+        storeEntityVariableLengthFields(entity, formattedMetrics, context, conn,
+            info);
+
+        conn.commit();
+      }
+    } catch (SQLException se) {
+      LOG.error("Failed to add entity to Phoenix " + se.getMessage());
+      throw new IOException(se);
+    } catch (Exception e) {
+      LOG.error("Exception on getting connection: " + e.getMessage());
+      throw new IOException(e);
+    }
+    return response;
+  }
+
+  /**
+   * Create Phoenix tables for offline aggregation storage if the tables do not
+   * exist.
+   *
+   * @throws IOException
+   */
+  public void createPhoenixTables() throws IOException {
+    // Create tables if necessary
+    try (Connection conn = getConnection();
+        Statement stmt = conn.createStatement()) {
+      // Table schema defined as in YARN-3817.
+      String sql = "CREATE TABLE IF NOT EXISTS "
+          + OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME
+          + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
+          + "flow_name VARCHAR NOT NULL, "
+          + "created_time UNSIGNED_LONG, modified_time UNSIGNED_LONG, "
+          + METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY, "
+          + "metric_names VARCHAR, info_keys VARCHAR "
+          + "CONSTRAINT pk PRIMARY KEY("
+          + "user, cluster, flow_name))";
+      stmt.executeUpdate(sql);
+      sql = "CREATE TABLE IF NOT EXISTS "
+          + OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME
+          + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
+          + "created_time UNSIGNED_LONG, modified_time UNSIGNED_LONG, "
+          + METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY, "
+          + "metric_names VARCHAR, info_keys VARCHAR "
+          + "CONSTRAINT pk PRIMARY KEY(user, cluster))";
+      stmt.executeUpdate(sql);
+      conn.commit();
+    } catch (SQLException se) {
+      LOG.error("Failed in init data " + se.getLocalizedMessage());
+      throw new IOException(se);
+    }
+    return;
+  }
+
+  // Utility functions
+  @Private
+  @VisibleForTesting
+  Connection getConnection() throws IOException {
+    Connection conn;
+    try {
+      conn = DriverManager.getConnection(connString, connProperties);
+      conn.setAutoCommit(false);
+    } catch (SQLException se) {
+      LOG.error("Failed to connect to phoenix server! "
+          + se.getLocalizedMessage());
+      throw new IOException(se);
+    }
+    return conn;
+  }
+
+  // WARNING: This method will permanently drop a table!
+  @Private
+  @VisibleForTesting
+  void dropTable(String tableName) throws Exception {
+    try (Connection conn = getConnection();
+         Statement stmt = conn.createStatement()) {
+      String sql = "DROP TABLE " + tableName;
+      stmt.executeUpdate(sql);
+    } catch (SQLException se) {
+      LOG.error("Failed in dropping entity table " + se.getLocalizedMessage());
+      throw se;
+    }
+  }
+
+  private static class DynamicColumns<K> {
+    static final String COLUMN_FAMILY_TYPE_BYTES = " VARBINARY";
+    static final String COLUMN_FAMILY_TYPE_STRING = " VARCHAR";
+    String columnFamilyPrefix;
+    String type;
+    Set<K> columns;
+
+    public DynamicColumns(String columnFamilyPrefix, String type,
+        Set<K> keyValues) {
+      this.columnFamilyPrefix = columnFamilyPrefix;
+      this.columns = keyValues;
+      this.type = type;
+    }
+  }
+
+  private static <K> StringBuilder appendColumnsSQL(
+      StringBuilder colNames, DynamicColumns<K> cfInfo) {
+    // Prepare the sql template by iterating through all keys
+    for (K key : cfInfo.columns) {
+      colNames.append(",").append(cfInfo.columnFamilyPrefix)
+          .append(key.toString()).append(cfInfo.type);
+    }
+    return colNames;
+  }
+
+  private static <K, V> int setValuesForColumnFamily(
+      PreparedStatement ps, Map<K, V> keyValues, int startPos,
+      boolean converToBytes) throws SQLException {
+    int idx = startPos;
+    for (Map.Entry<K, V> entry : keyValues.entrySet()) {
+      V value = entry.getValue();
+      if (value instanceof Collection) {
+        ps.setString(idx++, StringUtils.join(
+            (Collection) value, AGGREGATION_STORAGE_SEPARATOR));
+      } else {
+        if (converToBytes) {
+          try {
+            ps.setBytes(idx++, GenericObjectMapper.write(entry.getValue()));
+          } catch (IOException ie) {
+            LOG.error("Exception in converting values into bytes "
+                + ie.getMessage());
+            throw new SQLException(ie);
+          }
+        } else {
+          ps.setString(idx++, value.toString());
+        }
+      }
+    }
+    return idx;
+  }
+
+  private static <K, V> int setBytesForColumnFamily(
+      PreparedStatement ps, Map<K, V> keyValues, int startPos)
+      throws SQLException {
+    return setValuesForColumnFamily(ps, keyValues, startPos, true);
+  }
+
+  private static <K, V> int setStringsForColumnFamily(
+      PreparedStatement ps, Map<K, V> keyValues, int startPos)
+      throws SQLException {
+    return setValuesForColumnFamily(ps, keyValues, startPos, false);
+  }
+
+  private static void storeEntityVariableLengthFields(TimelineEntity entity,
+      Map<String, TimelineMetric> formattedMetrics,
+      TimelineCollectorContext context, Connection conn,
+      OfflineAggregationInfo aggregationInfo) throws SQLException {
+    int numPlaceholders = 0;
+    StringBuilder columnDefs = new StringBuilder(
+        StringUtils.join(aggregationInfo.getPrimaryKeyList(), ","));
+    if (formattedMetrics != null && formattedMetrics.size() > 0) {
+      appendColumnsSQL(columnDefs, new DynamicColumns<>(
+          METRIC_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES,
+          formattedMetrics.keySet()));
+      numPlaceholders += formattedMetrics.keySet().size();
+    }
+    if (numPlaceholders == 0) {
+      return;
+    }
+    StringBuilder placeholders = new StringBuilder();
+    placeholders.append(
+        StringUtils.repeat("?,", aggregationInfo.getPrimaryKeyList().length));
+    // numPlaceholders >= 1 now
+    placeholders.append("?")
+        .append(StringUtils.repeat(",?", numPlaceholders - 1));
+    String sqlVariableLengthFields = new StringBuilder("UPSERT INTO ")
+        .append(aggregationInfo.getTableName()).append(" (").append(columnDefs)
+        .append(") VALUES(").append(placeholders).append(")").toString();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SQL statement for variable length fields: "
+          + sqlVariableLengthFields);
+    }
+    // Use try with resource statement for the prepared statement
+    try (PreparedStatement psVariableLengthFields =
+        conn.prepareStatement(sqlVariableLengthFields)) {
+      int idx = aggregationInfo.setStringsForPrimaryKey(
+          psVariableLengthFields, context, null, 1);
+      if (formattedMetrics != null && formattedMetrics.size() > 0) {
+        idx = setBytesForColumnFamily(
+            psVariableLengthFields, formattedMetrics, idx);
+      }
+      psVariableLengthFields.execute();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87a00ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
deleted file mode 100644
index 381ff17..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
+++ /dev/null
@@ -1,530 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-@Private
-@Unstable
-public class PhoenixTimelineWriterImpl extends AbstractService
-    implements TimelineWriter {
-
-  public static final String TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR
-      = YarnConfiguration.TIMELINE_SERVICE_PREFIX
-          + "writer.phoenix.connectionString";
-
-  public static final String TIMELINE_SERVICE_PHEONIX_STORAGE_CONN_STR_DEFAULT
-      = "jdbc:phoenix:localhost:2181:/hbase";
-
-  private static final Log LOG
-      = LogFactory.getLog(PhoenixTimelineWriterImpl.class);
-  private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER
-      = "timeline_cf_placeholder";
-  // These lists are not taking effects in table creations.
-  private static final String[] PHOENIX_STORAGE_PK_LIST
-      = {"cluster", "user", "flow_name", "flow_version", "flow_run", "app_id",
-         "type", "entity_id"};
-  private static final String[] TIMELINE_EVENT_EXTRA_PK_LIST =
-      {"timestamp", "event_id"};
-  private static final String[] TIMELINE_METRIC_EXTRA_PK_LIST =
-      {"metric_id"};
-  /** Default Phoenix JDBC driver name */
-  private static final String DRIVER_CLASS_NAME
-      = "org.apache.phoenix.jdbc.PhoenixDriver";
-
-  /** Default Phoenix timeline entity table name */
-  @VisibleForTesting
-  static final String ENTITY_TABLE_NAME = "timeline_entity";
-  /** Default Phoenix event table name */
-  @VisibleForTesting
-  static final String EVENT_TABLE_NAME = "timeline_event";
-  /** Default Phoenix metric table name */
-  @VisibleForTesting
-  static final String METRIC_TABLE_NAME = "metric_singledata";
-
-  /** Default Phoenix timeline config column family */
-  private static final String CONFIG_COLUMN_FAMILY = "c.";
-  /** Default Phoenix timeline info column family */
-  private static final String INFO_COLUMN_FAMILY = "i.";
-  /** Default Phoenix event info column family */
-  private static final String EVENT_INFO_COLUMN_FAMILY = "ei.";
-  /** Default Phoenix isRelatedTo column family */
-  private static final String IS_RELATED_TO_FAMILY = "ir.";
-  /** Default Phoenix relatesTo column family */
-  private static final String RELATES_TO_FAMILY = "rt.";
-  /** Default separator for Phoenix storage */
-  private static final String PHOENIX_STORAGE_SEPARATOR = ";";
-
-  /** Connection string to the deployed Phoenix cluster */
-  @VisibleForTesting
-  String connString = null;
-  @VisibleForTesting
-  Properties connProperties = new Properties();
-
-  PhoenixTimelineWriterImpl() {
-    super((PhoenixTimelineWriterImpl.class.getName()));
-  }
-
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    // so check it here and only read in the config if it's not overridden.
-    connString =
-        conf.get(TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR,
-        TIMELINE_SERVICE_PHEONIX_STORAGE_CONN_STR_DEFAULT);
-    createTables();
-    super.init(conf);
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    super.serviceStop();
-  }
-
-  @Override
-  public TimelineWriteResponse write(String clusterId, String userId,
-      String flowName, String flowVersion, long flowRunId, String appId,
-      TimelineEntities entities) throws IOException {
-    TimelineWriteResponse response = new TimelineWriteResponse();
-    TimelineCollectorContext currContext = new TimelineCollectorContext(
-        clusterId, userId, flowName, flowVersion, flowRunId, appId);
-    String sql = "UPSERT INTO " + ENTITY_TABLE_NAME
-        + " (" + StringUtils.join(PHOENIX_STORAGE_PK_LIST, ",")
-        + ", creation_time, modified_time, configs) "
-        + "VALUES (" + StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length)
-        + "?, ?, ?)";
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("TimelineEntity write SQL: " + sql);
-    }
-
-    try (Connection conn = getConnection();
-        PreparedStatement ps = conn.prepareStatement(sql)) {
-      for (TimelineEntity entity : entities.getEntities()) {
-        int idx = setStringsForPrimaryKey(ps, currContext, entity, 1);
-        ps.setLong(idx++, entity.getCreatedTime());
-        ps.setLong(idx++, entity.getModifiedTime());
-        String configKeys = StringUtils.join(
-            entity.getConfigs().keySet(), PHOENIX_STORAGE_SEPARATOR);
-        ps.setString(idx++, configKeys);
-        ps.execute();
-
-        storeEntityVariableLengthFields(entity, currContext, conn);
-        storeEvents(entity, currContext, conn);
-        storeMetrics(entity, currContext, conn);
-
-        conn.commit();
-      }
-    } catch (SQLException se) {
-      LOG.error("Failed to add entity to Phoenix " + se.getMessage());
-      throw new IOException(se);
-    } catch (Exception e) {
-      LOG.error("Exception on getting connection: " + e.getMessage());
-      throw new IOException(e);
-    }
-    return response;
-  }
-
-  /**
-   * Aggregates the entity information to the timeline store based on which
-   * track this entity is to be rolled up to The tracks along which aggregations
-   * are to be done are given by {@link TimelineAggregationTrack}
-   *
-   * Any errors occurring for individual write request objects will be reported
-   * in the response.
-   *
-   * @param data
-   *          a {@link TimelineEntity} object
-   *          a {@link TimelineAggregationTrack} enum value
-   * @return a {@link TimelineWriteResponse} object.
-   * @throws IOException
-   */
-  @Override
-  public TimelineWriteResponse aggregate(TimelineEntity data,
-      TimelineAggregationTrack track) throws IOException {
-    return null;
-
-  }
-
-  @Override
-  public void flush() throws IOException {
-    // currently no-op
-  }
-
-  // Utility functions
-  @Private
-  @VisibleForTesting
-  Connection getConnection() throws IOException {
-    Connection conn;
-    try {
-      Class.forName(DRIVER_CLASS_NAME);
-      conn = DriverManager.getConnection(connString, connProperties);
-      conn.setAutoCommit(false);
-    } catch (SQLException se) {
-      LOG.error("Failed to connect to phoenix server! "
-          + se.getLocalizedMessage());
-      throw new IOException(se);
-    } catch (ClassNotFoundException e) {
-      LOG.error("Class not found! " + e.getLocalizedMessage());
-      throw new IOException(e);
-    }
-    return conn;
-  }
-
-  private void createTables() throws Exception {
-    // Create tables if necessary
-    try (Connection conn = getConnection();
-        Statement stmt = conn.createStatement()) {
-      // Table schema defined as in YARN-3134.
-      String sql = "CREATE TABLE IF NOT EXISTS " + ENTITY_TABLE_NAME
-          + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
-          + "flow_name VARCHAR NOT NULL, flow_version VARCHAR NOT NULL, "
-          + "flow_run UNSIGNED_LONG NOT NULL, "
-          + "app_id VARCHAR NOT NULL, type VARCHAR NOT NULL, "
-          + "entity_id VARCHAR NOT NULL, "
-          + "creation_time UNSIGNED_LONG, modified_time UNSIGNED_LONG, "
-          + "configs VARCHAR, "
-          + CONFIG_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARCHAR, "
-          + INFO_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY, "
-          + IS_RELATED_TO_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARCHAR, "
-          + RELATES_TO_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARCHAR "
-          + "CONSTRAINT pk PRIMARY KEY("
-          + "user, cluster, flow_name, flow_version, flow_run DESC, app_id, "
-          + "type, entity_id))";
-      stmt.executeUpdate(sql);
-      sql = "CREATE TABLE IF NOT EXISTS " + EVENT_TABLE_NAME
-          + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
-          + "flow_name VARCHAR NOT NULL, flow_version VARCHAR NOT NULL, "
-          + "flow_run UNSIGNED_LONG NOT NULL, "
-          + "app_id VARCHAR NOT NULL, type VARCHAR NOT NULL, "
-          + "entity_id VARCHAR NOT NULL, "
-          + "timestamp UNSIGNED_LONG NOT NULL, event_id VARCHAR NOT NULL, "
-          + EVENT_INFO_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY "
-          + "CONSTRAINT pk PRIMARY KEY("
-          + "user, cluster, flow_name, flow_version, flow_run DESC, app_id, "
-          + "type, entity_id, timestamp DESC, event_id))";
-      stmt.executeUpdate(sql);
-      sql = "CREATE TABLE IF NOT EXISTS " + METRIC_TABLE_NAME
-          + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
-          + "flow_name VARCHAR NOT NULL, flow_version VARCHAR NOT NULL, "
-          + "flow_run UNSIGNED_LONG NOT NULL, "
-          + "app_id VARCHAR NOT NULL, type VARCHAR NOT NULL, "
-          + "entity_id VARCHAR NOT NULL, "
-          + "metric_id VARCHAR NOT NULL, "
-          + "singledata VARBINARY, "
-          + "time UNSIGNED_LONG "
-          + "CONSTRAINT pk PRIMARY KEY("
-          + "user, cluster, flow_name, flow_version, flow_run DESC, app_id, "
-          + "type, entity_id, metric_id))";
-      stmt.executeUpdate(sql);
-      conn.commit();
-    } catch (SQLException se) {
-      LOG.error("Failed in init data " + se.getLocalizedMessage());
-      throw se;
-    }
-    return;
-  }
-
-  private static class DynamicColumns<K> {
-    static final String COLUMN_FAMILY_TYPE_BYTES = " VARBINARY";
-    static final String COLUMN_FAMILY_TYPE_STRING = " VARCHAR";
-    String columnFamilyPrefix;
-    String type;
-    Set<K> columns;
-
-    public DynamicColumns(String columnFamilyPrefix, String type,
-        Set<K> keyValues) {
-      this.columnFamilyPrefix = columnFamilyPrefix;
-      this.columns = keyValues;
-      this.type = type;
-    }
-  }
-
-  private static <K> StringBuilder appendColumnsSQL(
-      StringBuilder colNames, DynamicColumns<K> cfInfo) {
-    // Prepare the sql template by iterating through all keys
-    for (K key : cfInfo.columns) {
-      colNames.append(",").append(cfInfo.columnFamilyPrefix)
-          .append(key.toString()).append(cfInfo.type);
-    }
-    return colNames;
-  }
-
-  private static <K, V> int setValuesForColumnFamily(
-      PreparedStatement ps, Map<K, V> keyValues, int startPos,
-      boolean converToBytes) throws SQLException {
-    int idx = startPos;
-    for (Map.Entry<K, V> entry : keyValues.entrySet()) {
-      V value = entry.getValue();
-      if (value instanceof Collection) {
-        ps.setString(idx++, StringUtils.join(
-            (Collection) value, PHOENIX_STORAGE_SEPARATOR));
-      } else {
-        if (converToBytes) {
-          try {
-            ps.setBytes(idx++, GenericObjectMapper.write(entry.getValue()));
-          } catch (IOException ie) {
-            LOG.error("Exception in converting values into bytes "
-                + ie.getMessage());
-            throw new SQLException(ie);
-          }
-        } else {
-          ps.setString(idx++, value.toString());
-        }
-      }
-    }
-    return idx;
-  }
-
-  private static <K, V> int setBytesForColumnFamily(
-      PreparedStatement ps, Map<K, V> keyValues, int startPos)
-      throws SQLException {
-    return setValuesForColumnFamily(ps, keyValues, startPos, true);
-  }
-
-  private static <K, V> int setStringsForColumnFamily(
-      PreparedStatement ps, Map<K, V> keyValues, int startPos)
-      throws SQLException {
-    return setValuesForColumnFamily(ps, keyValues, startPos, false);
-  }
-
-  private static int setStringsForPrimaryKey(PreparedStatement ps,
-      TimelineCollectorContext context, TimelineEntity entity, int startPos)
-      throws SQLException {
-    int idx = startPos;
-    ps.setString(idx++, context.getClusterId());
-    ps.setString(idx++, context.getUserId());
-    ps.setString(idx++,
-        context.getFlowName());
-    ps.setString(idx++, context.getFlowVersion());
-    ps.setLong(idx++, context.getFlowRunId());
-    ps.setString(idx++, context.getAppId());
-    ps.setString(idx++, entity.getType());
-    ps.setString(idx++, entity.getId());
-    return idx;
-  }
-
-  private static void storeEntityVariableLengthFields(TimelineEntity entity,
-      TimelineCollectorContext context, Connection conn) throws SQLException {
-    int numPlaceholders = 0;
-    StringBuilder columnDefs = new StringBuilder(
-        StringUtils.join(PHOENIX_STORAGE_PK_LIST, ","));
-    if (entity.getConfigs() != null) {
-      Set<String> keySet = entity.getConfigs().keySet();
-      appendColumnsSQL(columnDefs, new DynamicColumns<>(
-          CONFIG_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_STRING,
-          keySet));
-      numPlaceholders += keySet.size();
-    }
-    if (entity.getInfo() != null) {
-      Set<String> keySet = entity.getInfo().keySet();
-      appendColumnsSQL(columnDefs, new DynamicColumns<>(
-          INFO_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES,
-          keySet));
-      numPlaceholders += keySet.size();
-    }
-    if (entity.getIsRelatedToEntities() != null) {
-      Set<String> keySet = entity.getIsRelatedToEntities().keySet();
-      appendColumnsSQL(columnDefs, new DynamicColumns<>(
-          IS_RELATED_TO_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_STRING,
-          keySet));
-      numPlaceholders += keySet.size();
-    }
-    if (entity.getRelatesToEntities() != null) {
-      Set<String> keySet = entity.getRelatesToEntities().keySet();
-      appendColumnsSQL(columnDefs, new DynamicColumns<>(
-          RELATES_TO_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_STRING,
-          keySet));
-      numPlaceholders += keySet.size();
-    }
-    if (numPlaceholders == 0) {
-      return;
-    }
-    StringBuilder placeholders = new StringBuilder();
-    placeholders.append(
-        StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length));
-    // numPlaceholders >= 1 now
-    placeholders.append("?")
-        .append(StringUtils.repeat(",?", numPlaceholders - 1));
-    String sqlVariableLengthFields = new StringBuilder("UPSERT INTO ")
-        .append(ENTITY_TABLE_NAME).append(" (").append(columnDefs)
-        .append(") VALUES(").append(placeholders).append(")").toString();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("SQL statement for variable length fields: "
-          + sqlVariableLengthFields);
-    }
-    // Use try with resource statement for the prepared statement
-    try (PreparedStatement psVariableLengthFields =
-        conn.prepareStatement(sqlVariableLengthFields)) {
-      int idx = setStringsForPrimaryKey(
-          psVariableLengthFields, context, entity, 1);
-      if (entity.getConfigs() != null) {
-        idx = setStringsForColumnFamily(
-            psVariableLengthFields, entity.getConfigs(), idx);
-      }
-      if (entity.getInfo() != null) {
-        idx = setBytesForColumnFamily(
-            psVariableLengthFields, entity.getInfo(), idx);
-      }
-      if (entity.getIsRelatedToEntities() != null) {
-        idx = setStringsForColumnFamily(
-            psVariableLengthFields, entity.getIsRelatedToEntities(), idx);
-      }
-      if (entity.getRelatesToEntities() != null) {
-        idx = setStringsForColumnFamily(
-            psVariableLengthFields, entity.getRelatesToEntities(), idx);
-      }
-      psVariableLengthFields.execute();
-    }
-  }
-
-  private static void storeMetrics(TimelineEntity entity,
-      TimelineCollectorContext context, Connection conn) throws SQLException {
-    if (entity.getMetrics() == null) {
-      return;
-    }
-    Set<TimelineMetric> metrics = entity.getMetrics();
-    for (TimelineMetric metric : metrics) {
-      StringBuilder sqlColumns = new StringBuilder(
-          StringUtils.join(PHOENIX_STORAGE_PK_LIST, ","));
-      sqlColumns.append(",")
-          .append(StringUtils.join(TIMELINE_METRIC_EXTRA_PK_LIST, ","));
-      sqlColumns.append(",").append("singledata, time");
-      StringBuilder placeholders = new StringBuilder();
-      placeholders.append(
-          StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length))
-          .append(StringUtils.repeat("?,", TIMELINE_METRIC_EXTRA_PK_LIST.length));
-      placeholders.append("?, ?");
-      String sqlMetric = new StringBuilder("UPSERT INTO ")
-          .append(METRIC_TABLE_NAME).append(" (").append(sqlColumns)
-          .append(") VALUES(").append(placeholders).append(")").toString();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("SQL statement for metric: " + sqlMetric);
-      }
-      try (PreparedStatement psMetrics = conn.prepareStatement(sqlMetric)) {
-        if (metric.getType().equals(TimelineMetric.Type.TIME_SERIES)) {
-          LOG.warn("The incoming timeline metric contains time series data, "
-              + "which is currently not supported by Phoenix storage. "
-              + "Time series will be truncated. ");
-        }
-        int idx = setStringsForPrimaryKey(psMetrics, context, entity, 1);
-        psMetrics.setString(idx++, metric.getId());
-        Iterator<Map.Entry<Long, Number>> currNumIter =
-            metric.getValues().entrySet().iterator();
-        if (currNumIter.hasNext()) {
-          // TODO: support time series storage
-          Map.Entry<Long, Number> currEntry = currNumIter.next();
-          psMetrics.setBytes(idx++,
-              GenericObjectMapper.write(currEntry.getValue()));
-          psMetrics.setLong(idx++, currEntry.getKey());
-        } else {
-          psMetrics.setBytes(idx++, GenericObjectMapper.write(null));
-          LOG.warn("The incoming metric contains an empty value set. ");
-        }
-        psMetrics.execute();
-      } catch (IOException ie) {
-        LOG.error("Exception on converting single data to bytes: "
-            + ie.getMessage());
-        throw new SQLException(ie);
-      }
-    }
-  }
-
-  private static void storeEvents(TimelineEntity entity,
-      TimelineCollectorContext context, Connection conn) throws SQLException {
-    if (entity.getEvents() == null) {
-      return;
-    }
-    Set<TimelineEvent> events = entity.getEvents();
-    for (TimelineEvent event : events) {
-      // We need this number to check if the incoming event's info field is empty
-      int numPlaceholders = 0;
-      StringBuilder sqlColumns = new StringBuilder(
-          StringUtils.join(PHOENIX_STORAGE_PK_LIST, ","));
-      sqlColumns.append(",")
-          .append(StringUtils.join(TIMELINE_EVENT_EXTRA_PK_LIST, ","));
-      appendColumnsSQL(sqlColumns, new DynamicColumns<>(
-          EVENT_INFO_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES,
-          event.getInfo().keySet()));
-      numPlaceholders += event.getInfo().keySet().size();
-      if (numPlaceholders == 0) {
-        continue;
-      }
-      StringBuilder placeholders = new StringBuilder();
-      placeholders.append(
-          StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length))
-          .append(StringUtils.repeat("?,", TIMELINE_EVENT_EXTRA_PK_LIST.length));
-      // numPlaceholders >= 1 now
-      placeholders.append("?")
-            .append(StringUtils.repeat(",?", numPlaceholders - 1));
-      String sqlEvents = new StringBuilder("UPSERT INTO ")
-          .append(EVENT_TABLE_NAME).append(" (").append(sqlColumns)
-          .append(") VALUES(").append(placeholders).append(")").toString();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("SQL statement for events: " + sqlEvents);
-      }
-      try (PreparedStatement psEvent = conn.prepareStatement(sqlEvents)) {
-        int idx = setStringsForPrimaryKey(psEvent, context, entity, 1);
-        psEvent.setLong(idx++, event.getTimestamp());
-        psEvent.setString(idx++, event.getId());
-        setBytesForColumnFamily(psEvent, event.getInfo(), idx);
-        psEvent.execute();
-      }
-    }
-  }
-
-  // WARNING: This method will permanently drop a table!
-  @Private
-  @VisibleForTesting
-  void dropTable(String tableName) throws Exception {
-    try (Connection conn = getConnection();
-         Statement stmt = conn.createStatement()) {
-      String sql = "DROP TABLE " + tableName;
-      stmt.executeUpdate(sql);
-    } catch (SQLException se) {
-      LOG.error("Failed in dropping entity table " + se.getLocalizedMessage());
-      throw se;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87a00ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
index 3a22ed6..5120856 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.yarn.server.timelineservice.storage;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -51,6 +53,7 @@ public class TimelineSchemaCreator {
 
   final static String NAME = TimelineSchemaCreator.class.getSimpleName();
   private static final Log LOG = LogFactory.getLog(TimelineSchemaCreator.class);
+  private static final String PHOENIX_OPTION_SHORT = "p";
 
   public static void main(String[] args) throws Exception {
 
@@ -83,7 +86,41 @@ public class TimelineSchemaCreator {
       hbaseConf.set(ApplicationTable.TABLE_NAME_CONF_NAME,
           applicationTableName);
     }
-    createAllTables(hbaseConf);
+
+    List<Exception> exceptions = new ArrayList<>();
+    try {
+      createAllTables(hbaseConf);
+      LOG.info("Successfully created HBase schema. ");
+    } catch (IOException e) {
+      LOG.error("Error in creating hbase tables: " + e.getMessage());
+      exceptions.add(e);
+    }
+
+    // Create Phoenix data schema if needed
+    if (commandLine.hasOption(PHOENIX_OPTION_SHORT)) {
+      Configuration phoenixConf = new Configuration();
+      try {
+        PhoenixOfflineAggregationWriterImpl phoenixWriter =
+            new PhoenixOfflineAggregationWriterImpl();
+        phoenixWriter.init(phoenixConf);
+        phoenixWriter.start();
+        phoenixWriter.createPhoenixTables();
+        phoenixWriter.stop();
+        LOG.info("Successfully created Phoenix offline aggregation schema. ");
+      } catch (IOException e) {
+        LOG.error("Error in creating phoenix tables: " + e.getMessage());
+        exceptions.add(e);
+      }
+    }
+    if (exceptions.size() > 0) {
+      LOG.warn("Schema creation finished with the following exceptions");
+      for (Exception e : exceptions) {
+        LOG.warn(e.getMessage());
+      }
+      System.exit(-1);
+    } else {
+      LOG.info("Schema creation finished successfully");
+    }
   }
 
   /**
@@ -115,6 +152,12 @@ public class TimelineSchemaCreator {
     o.setRequired(false);
     options.addOption(o);
 
+    o = new Option(PHOENIX_OPTION_SHORT, "usePhoenix", false,
+        "create Phoenix offline aggregation tables");
+    // No need to set arg name since we do not need an argument here
+    o.setRequired(false);
+    options.addOption(o);
+
     CommandLineParser parser = new PosixParser();
     CommandLine commandLine = null;
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87a00ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/OfflineAggregationInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/OfflineAggregationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/OfflineAggregationInfo.java
new file mode 100644
index 0000000..16c03a3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/OfflineAggregationInfo.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Class to carry the offline aggregation information for storage level
+ * implementations. There are currently two predefined aggregation info
+ * instances that represent flow and user level offline aggregations. Depend on
+ * its implementation, a storage class may use an OfflineAggregationInfo object
+ * to decide behaviors dynamically.
+ */
+public final class OfflineAggregationInfo {
+  /**
+   * Default flow level aggregation table name
+   */
+  @VisibleForTesting
+  public static final String FLOW_AGGREGATION_TABLE_NAME
+      = "yarn_timeline_flow_aggregation";
+  /**
+   * Default user level aggregation table name
+   */
+  public static final String USER_AGGREGATION_TABLE_NAME
+      = "yarn_timeline_user_aggregation";
+
+  // These lists are not taking effects in table creations.
+  private static final String[] FLOW_AGGREGATION_PK_LIST =
+      { "user", "cluster", "flow_name" };
+  private static final String[] USER_AGGREGATION_PK_LIST = { "user", "cluster"};
+
+  private final String tableName;
+  private final String[] primaryKeyList;
+  private final PrimaryKeyStringSetter primaryKeyStringSetter;
+
+  private OfflineAggregationInfo(String table, String[] pkList,
+      PrimaryKeyStringSetter formatter) {
+    tableName = table;
+    primaryKeyList = pkList;
+    primaryKeyStringSetter = formatter;
+  }
+
+  private interface PrimaryKeyStringSetter {
+    int setValues(PreparedStatement ps, TimelineCollectorContext context,
+        String[] extraInfo, int startPos) throws SQLException;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public String[] getPrimaryKeyList() {
+    return primaryKeyList.clone();
+  }
+
+  public int setStringsForPrimaryKey(PreparedStatement ps,
+      TimelineCollectorContext context, String[] extraInfo, int startPos)
+      throws SQLException {
+    return primaryKeyStringSetter.setValues(ps, context, extraInfo, startPos);
+  }
+
+  public static final OfflineAggregationInfo FLOW_AGGREGATION =
+      new OfflineAggregationInfo(FLOW_AGGREGATION_TABLE_NAME,
+          FLOW_AGGREGATION_PK_LIST, new PrimaryKeyStringSetter() {
+        @Override
+        public int setValues(PreparedStatement ps,
+            TimelineCollectorContext context, String[] extraInfo, int startPos)
+            throws SQLException {
+          int idx = startPos;
+          ps.setString(idx++, context.getUserId());
+          ps.setString(idx++, context.getClusterId());
+          ps.setString(idx++, context.getFlowName());
+          return idx;
+        }
+      });
+
+  public static final OfflineAggregationInfo USER_AGGREGATION =
+      new OfflineAggregationInfo(USER_AGGREGATION_TABLE_NAME,
+          USER_AGGREGATION_PK_LIST, new PrimaryKeyStringSetter() {
+        @Override
+        public int setValues(PreparedStatement ps,
+            TimelineCollectorContext context, String[] extraInfo, int startPos)
+            throws SQLException {
+          int idx = startPos;
+          ps.setString(idx++, context.getUserId());
+          ps.setString(idx++, context.getClusterId());
+          return idx;
+        }
+      });
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87a00ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java
new file mode 100644
index 0000000..de66a17
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+
+public class TestPhoenixOfflineAggregationWriterImpl extends BaseTest {
+  private static PhoenixOfflineAggregationWriterImpl storage;
+  private static final int BATCH_SIZE = 3;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    storage = setupPhoenixClusterAndWriterForTest(conf);
+  }
+
+  @Test(timeout = 90000)
+  public void testFlowLevelAggregationStorage() throws Exception {
+    testAggregator(OfflineAggregationInfo.FLOW_AGGREGATION);
+  }
+
+  @Test(timeout = 90000)
+  public void testUserLevelAggregationStorage() throws Exception {
+    testAggregator(OfflineAggregationInfo.USER_AGGREGATION);
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    storage.dropTable(OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME);
+    storage.dropTable(OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME);
+    tearDownMiniCluster();
+  }
+
+  private static PhoenixOfflineAggregationWriterImpl
+    setupPhoenixClusterAndWriterForTest(YarnConfiguration conf)
+      throws Exception{
+    Map<String, String> props = new HashMap<>();
+    // Must update config before starting server
+    props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
+        Boolean.FALSE.toString());
+    props.put("java.security.krb5.realm", "");
+    props.put("java.security.krb5.kdc", "");
+    props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER,
+        Boolean.FALSE.toString());
+    props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000));
+    props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100));
+    // Make a small batch size to test multiple calls to reserve sequences
+    props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB,
+        Long.toString(BATCH_SIZE));
+    // Must update config before starting server
+    setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+
+    // Change connection settings for test
+    conf.set(
+        YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR,
+        getUrl());
+    PhoenixOfflineAggregationWriterImpl
+        myWriter = new PhoenixOfflineAggregationWriterImpl(TEST_PROPERTIES);
+    myWriter.init(conf);
+    myWriter.start();
+    myWriter.createPhoenixTables();
+    return myWriter;
+  }
+
+  private static TimelineEntity getTestAggregationTimelineEntity() {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "hello1";
+    String type = "testAggregationType";
+    entity.setId(id);
+    entity.setType(type);
+    entity.setCreatedTime(1425016501000L);
+    entity.setModifiedTime(1425016502000L);
+
+    TimelineMetric metric = new TimelineMetric();
+    metric.setId("HDFS_BYTES_READ");
+    metric.addValue(1425016501100L, 8000);
+    entity.addMetric(metric);
+
+    return entity;
+  }
+
+  private void testAggregator(OfflineAggregationInfo aggregationInfo)
+      throws Exception {
+    // Set up a list of timeline entities and write them back to Phoenix
+    int numEntity = 1;
+    TimelineEntities te = new TimelineEntities();
+    te.addEntity(getTestAggregationTimelineEntity());
+    TimelineCollectorContext context = new TimelineCollectorContext("cluster_1",
+        "user1", "testFlow", null, 0, null);
+    storage.writeAggregatedEntity(context, te,
+        aggregationInfo);
+
+    // Verify if we're storing all entities
+    String[] primaryKeyList = aggregationInfo.getPrimaryKeyList();
+    String sql = "SELECT COUNT(" + primaryKeyList[primaryKeyList.length - 1]
+        +") FROM " + aggregationInfo.getTableName();
+    verifySQLWithCount(sql, numEntity, "Number of entities should be ");
+    // Check metric
+    sql = "SELECT COUNT(m.HDFS_BYTES_READ) FROM "
+        + aggregationInfo.getTableName() + "(m.HDFS_BYTES_READ VARBINARY) ";
+    verifySQLWithCount(sql, numEntity,
+        "Number of entities with info should be ");
+  }
+
+
+  private void verifySQLWithCount(String sql, int targetCount, String message)
+      throws Exception {
+    try (
+        Statement stmt =
+          storage.getConnection().createStatement();
+        ResultSet rs = stmt.executeQuery(sql)) {
+      assertTrue("Result set empty on statement " + sql, rs.next());
+      assertNotNull("Fail to execute query " + sql, rs);
+      assertEquals(message + " " + targetCount, targetCount, rs.getInt(1));
+    } catch (SQLException se) {
+      fail("SQL exception on query: " + sql
+          + " With exception message: " + se.getLocalizedMessage());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87a00ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java
deleted file mode 100644
index dece83d..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.hadoop.hbase.IntegrationTestingUtility;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
-import org.apache.phoenix.query.BaseTest;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-
-public class TestPhoenixTimelineWriterImpl extends BaseTest {
-  private static PhoenixTimelineWriterImpl writer;
-  private static final int BATCH_SIZE = 3;
-
-  @BeforeClass
-  public static void setup() throws Exception {
-    YarnConfiguration conf = new YarnConfiguration();
-    writer = setupPhoenixClusterAndWriterForTest(conf);
-  }
-
-  @Test(timeout = 90000)
-  public void testPhoenixWriterBasic() throws Exception {
-    // Set up a list of timeline entities and write them back to Phoenix
-    int numEntity = 12;
-    TimelineEntities te =
-        TestTimelineWriterImpl.getStandardTestTimelineEntities(numEntity);
-    writer.write("cluster_1", "user1", "testFlow", "version1", 1l, "app_test_1", te);
-    // Verify if we're storing all entities
-    String sql = "SELECT COUNT(entity_id) FROM "
-        + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME;
-    verifySQLWithCount(sql, numEntity, "Number of entities should be ");
-    // Check config (half of all entities)
-    sql = "SELECT COUNT(c.config) FROM "
-        + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME + "(c.config VARCHAR) ";
-    verifySQLWithCount(sql, (numEntity / 2),
-        "Number of entities with config should be ");
-    // Check info (half of all entities)
-    sql = "SELECT COUNT(i.info1) FROM "
-        + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME + "(i.info1 VARBINARY) ";
-    verifySQLWithCount(sql, (numEntity / 2),
-        "Number of entities with info should be ");
-    // Check config and info (a quarter of all entities)
-    sql = "SELECT COUNT(entity_id) FROM "
-        + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME
-        + "(c.config VARCHAR, i.info1 VARBINARY) "
-        + "WHERE c.config IS NOT NULL AND i.info1 IS NOT NULL";
-    verifySQLWithCount(sql, (numEntity / 4),
-        "Number of entities with both config and info should be ");
-    // Check relatesToEntities and isRelatedToEntities
-    sql = "SELECT COUNT(entity_id) FROM "
-        + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME
-        + "(rt.testType VARCHAR, ir.testType VARCHAR) "
-        + "WHERE rt.testType IS NOT NULL AND ir.testType IS NOT NULL";
-    verifySQLWithCount(sql, numEntity - 2,
-        "Number of entities with both relatesTo and isRelatedTo should be ");
-    // Check event
-    sql = "SELECT COUNT(entity_id) FROM "
-        + PhoenixTimelineWriterImpl.EVENT_TABLE_NAME;
-    verifySQLWithCount(sql, (numEntity / 4), "Number of events should be ");
-    // Check metrics
-    sql = "SELECT COUNT(entity_id) FROM "
-        + PhoenixTimelineWriterImpl.METRIC_TABLE_NAME;
-    verifySQLWithCount(sql, (numEntity / 4), "Number of events should be ");
-  }
-
-  @AfterClass
-  public static void cleanup() throws Exception {
-    writer.dropTable(PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME);
-    writer.dropTable(PhoenixTimelineWriterImpl.EVENT_TABLE_NAME);
-    writer.dropTable(PhoenixTimelineWriterImpl.METRIC_TABLE_NAME);
-    writer.serviceStop();
-    tearDownMiniCluster();
-  }
-
-  private static PhoenixTimelineWriterImpl setupPhoenixClusterAndWriterForTest(
-      YarnConfiguration conf) throws Exception{
-    Map<String, String> props = new HashMap<>();
-    // Must update config before starting server
-    props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
-        Boolean.FALSE.toString());
-    props.put("java.security.krb5.realm", "");
-    props.put("java.security.krb5.kdc", "");
-    props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER,
-        Boolean.FALSE.toString());
-    props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000));
-    props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100));
-    // Make a small batch size to test multiple calls to reserve sequences
-    props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB,
-        Long.toString(BATCH_SIZE));
-    // Must update config before starting server
-    setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-
-    PhoenixTimelineWriterImpl myWriter = new PhoenixTimelineWriterImpl();
-    // Change connection settings for test
-    conf.set(
-        PhoenixTimelineWriterImpl.TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR,
-        getUrl());
-    myWriter.connProperties = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-    myWriter.serviceInit(conf);
-    return myWriter;
-  }
-
-  private void verifySQLWithCount(String sql, int targetCount, String message)
-      throws Exception {
-    try (
-        Statement stmt =
-          writer.getConnection().createStatement();
-        ResultSet rs = stmt.executeQuery(sql)) {
-      assertTrue("Result set empty on statement " + sql, rs.next());
-      assertNotNull("Fail to execute query " + sql, rs);
-      assertEquals(message + " " + targetCount, targetCount, rs.getInt(1));
-    } catch (SQLException se) {
-      fail("SQL exception on query: " + sql
-          + " With exception message: " + se.getLocalizedMessage());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87a00ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java
deleted file mode 100644
index 7a7afc0..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-
-public class TestTimelineWriterImpl {
-  static TimelineEntities getStandardTestTimelineEntities(int listSize) {
-    TimelineEntities te = new TimelineEntities();
-    for (int i = 0; i < listSize; i++) {
-      TimelineEntity entity = new TimelineEntity();
-      String id = "hello" + i;
-      String type = "testType";
-      entity.setId(id);
-      entity.setType(type);
-      entity.setCreatedTime(1425016501000L + i);
-      entity.setModifiedTime(1425016502000L + i);
-      if (i > 0) {
-        entity.addRelatesToEntity(type, "hello" + i);
-        entity.addRelatesToEntity(type, "hello" + (i - 1));
-      }
-      if (i < listSize - 1) {
-        entity.addIsRelatedToEntity(type, "hello" + i);
-        entity.addIsRelatedToEntity(type, "hello" + (i + 1));
-      }
-      int category = i % 4;
-      switch (category) {
-      case 0:
-        entity.addConfig("config", "config" + i);
-        // Fall through deliberately
-      case 1:
-        entity.addInfo("info1", new Integer(i));
-        entity.addInfo("info2", "helloworld");
-        // Fall through deliberately
-      case 2:
-        break;
-      case 3:
-        entity.addConfig("config", "config" + i);
-        TimelineEvent event = new TimelineEvent();
-        event.setId("test event");
-        event.setTimestamp(1425016501100L + i);
-        event.addInfo("test_info", "content for " + entity.getId());
-        event.addInfo("test_info1", new Integer(i));
-        entity.addEvent(event);
-        TimelineMetric metric = new TimelineMetric();
-        metric.setId("HDFS_BYTES_READ");
-        metric.addValue(1425016501100L + i, 8000 + i);
-        entity.addMetric(metric);
-        break;
-      }
-      te.addEntity(entity);
-    }
-    return te;
-  }
-}