You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2016/06/21 23:49:03 UTC

[17/50] [abbrv] hadoop git commit: YARN-5045. hbase unit tests fail due to dependency issues. (Sangjin Lee via varunsaxena)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ccdec4a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java
deleted file mode 100644
index 58d5e61..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.hadoop.hbase.IntegrationTestingUtility;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
-import org.apache.phoenix.query.BaseTest;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.ReadOnlyProps;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-
-public class TestPhoenixOfflineAggregationWriterImpl extends BaseTest {
-  private static PhoenixOfflineAggregationWriterImpl storage;
-  private static final int BATCH_SIZE = 3;
-
-  @BeforeClass
-  public static void setup() throws Exception {
-    YarnConfiguration conf = new YarnConfiguration();
-    storage = setupPhoenixClusterAndWriterForTest(conf);
-  }
-
-  @Test(timeout = 90000)
-  public void testFlowLevelAggregationStorage() throws Exception {
-    testAggregator(OfflineAggregationInfo.FLOW_AGGREGATION);
-  }
-
-  @Test(timeout = 90000)
-  public void testUserLevelAggregationStorage() throws Exception {
-    testAggregator(OfflineAggregationInfo.USER_AGGREGATION);
-  }
-
-  @AfterClass
-  public static void cleanup() throws Exception {
-    storage.dropTable(OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME);
-    storage.dropTable(OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME);
-    tearDownMiniCluster();
-  }
-
-  private static PhoenixOfflineAggregationWriterImpl
-    setupPhoenixClusterAndWriterForTest(YarnConfiguration conf)
-      throws Exception{
-    Map<String, String> props = new HashMap<>();
-    // Must update config before starting server
-    props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
-        Boolean.FALSE.toString());
-    props.put("java.security.krb5.realm", "");
-    props.put("java.security.krb5.kdc", "");
-    props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER,
-        Boolean.FALSE.toString());
-    props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000));
-    props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100));
-    // Make a small batch size to test multiple calls to reserve sequences
-    props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB,
-        Long.toString(BATCH_SIZE));
-    // Must update config before starting server
-    setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-
-    // Change connection settings for test
-    conf.set(
-        YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR,
-        getUrl());
-    PhoenixOfflineAggregationWriterImpl
-        myWriter = new PhoenixOfflineAggregationWriterImpl(TEST_PROPERTIES);
-    myWriter.init(conf);
-    myWriter.start();
-    myWriter.createPhoenixTables();
-    return myWriter;
-  }
-
-  private static TimelineEntity getTestAggregationTimelineEntity() {
-    TimelineEntity entity = new TimelineEntity();
-    String id = "hello1";
-    String type = "testAggregationType";
-    entity.setId(id);
-    entity.setType(type);
-    entity.setCreatedTime(1425016501000L);
-
-    TimelineMetric metric = new TimelineMetric();
-    metric.setId("HDFS_BYTES_READ");
-    metric.addValue(1425016501100L, 8000);
-    entity.addMetric(metric);
-
-    return entity;
-  }
-
-  private void testAggregator(OfflineAggregationInfo aggregationInfo)
-      throws Exception {
-    // Set up a list of timeline entities and write them back to Phoenix
-    int numEntity = 1;
-    TimelineEntities te = new TimelineEntities();
-    te.addEntity(getTestAggregationTimelineEntity());
-    TimelineCollectorContext context = new TimelineCollectorContext("cluster_1",
-        "user1", "testFlow", null, 0L, null);
-    storage.writeAggregatedEntity(context, te,
-        aggregationInfo);
-
-    // Verify if we're storing all entities
-    String[] primaryKeyList = aggregationInfo.getPrimaryKeyList();
-    String sql = "SELECT COUNT(" + primaryKeyList[primaryKeyList.length - 1]
-        +") FROM " + aggregationInfo.getTableName();
-    verifySQLWithCount(sql, numEntity, "Number of entities should be ");
-    // Check metric
-    sql = "SELECT COUNT(m.HDFS_BYTES_READ) FROM "
-        + aggregationInfo.getTableName() + "(m.HDFS_BYTES_READ VARBINARY) ";
-    verifySQLWithCount(sql, numEntity,
-        "Number of entities with info should be ");
-  }
-
-
-  private void verifySQLWithCount(String sql, int targetCount, String message)
-      throws Exception {
-    try (
-        Statement stmt =
-          storage.getConnection().createStatement();
-        ResultSet rs = stmt.executeQuery(sql)) {
-      assertTrue("Result set empty on statement " + sql, rs.next());
-      assertNotNull("Fail to execute query " + sql, rs);
-      assertEquals(message + " " + targetCount, targetCount, rs.getInt(1));
-    } catch (SQLException se) {
-      fail("SQL exception on query: " + sql
-          + " With exception message: " + se.getLocalizedMessage());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ccdec4a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
deleted file mode 100644
index 3b8036d..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
+++ /dev/null
@@ -1,383 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Generates the data/entities for the FlowRun and FlowActivity Tables
- */
-class TestFlowDataGenerator {
-
-  private static final String metric1 = "MAP_SLOT_MILLIS";
-  private static final String metric2 = "HDFS_BYTES_READ";
-  public static final long END_TS_INCR = 10000L;
-
-  static TimelineEntity getEntityMetricsApp1(long insertTs, Configuration c1) {
-    TimelineEntity entity = new TimelineEntity();
-    String id = "flowRunMetrics_test";
-    String type = TimelineEntityType.YARN_APPLICATION.toString();
-    entity.setId(id);
-    entity.setType(type);
-    long cTime = 1425016501000L;
-    entity.setCreatedTime(cTime);
-
-    // add metrics
-    Set<TimelineMetric> metrics = new HashSet<>();
-    TimelineMetric m1 = new TimelineMetric();
-    m1.setId(metric1);
-    Map<Long, Number> metricValues = new HashMap<Long, Number>();
-    long ts = insertTs;
-
-    for (int k=1; k< 100 ; k++) {
-    metricValues.put(ts - k*200000, 20L);
-    }
-    metricValues.put(ts - 80000, 40L);
-    m1.setType(Type.TIME_SERIES);
-    m1.setValues(metricValues);
-    metrics.add(m1);
-
-    TimelineMetric m2 = new TimelineMetric();
-    m2.setId(metric2);
-    metricValues = new HashMap<Long, Number>();
-    ts = System.currentTimeMillis();
-    for (int k=1; k< 100 ; k++) {
-      metricValues.put(ts - k*100000, 31L);
-    }
-
-    metricValues.put(ts - 80000, 57L);
-    m2.setType(Type.TIME_SERIES);
-    m2.setValues(metricValues);
-    metrics.add(m2);
-
-    entity.addMetrics(metrics);
-    return entity;
-  }
-
-
-  static TimelineEntity getEntityMetricsApp1Complete(long insertTs, Configuration c1) {
-    TimelineEntity entity = new TimelineEntity();
-    String id = "flowRunMetrics_test";
-    String type = TimelineEntityType.YARN_APPLICATION.toString();
-    entity.setId(id);
-    entity.setType(type);
-    long cTime = 1425016501000L;
-    entity.setCreatedTime(cTime);
-
-    // add metrics
-    Set<TimelineMetric> metrics = new HashSet<>();
-    TimelineMetric m1 = new TimelineMetric();
-    m1.setId(metric1);
-    Map<Long, Number> metricValues = new HashMap<Long, Number>();
-    long ts = insertTs;
-
-    metricValues.put(ts - 80000, 40L);
-    m1.setType(Type.TIME_SERIES);
-    m1.setValues(metricValues);
-    metrics.add(m1);
-
-    TimelineMetric m2 = new TimelineMetric();
-    m2.setId(metric2);
-    metricValues = new HashMap<Long, Number>();
-    ts = insertTs;
-    metricValues.put(ts - 80000, 57L);
-    m2.setType(Type.TIME_SERIES);
-    m2.setValues(metricValues);
-    metrics.add(m2);
-
-    entity.addMetrics(metrics);
-
-    TimelineEvent event = new TimelineEvent();
-    event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
-    event.setTimestamp(insertTs);
-    event.addInfo("done", "insertTs=" + insertTs);
-    entity.addEvent(event);
-    return entity;
-  }
-
-
-  static TimelineEntity getEntityMetricsApp1(long insertTs) {
-    TimelineEntity entity = new TimelineEntity();
-    String id = "flowRunMetrics_test";
-    String type = TimelineEntityType.YARN_APPLICATION.toString();
-    entity.setId(id);
-    entity.setType(type);
-    long cTime = 1425016501000L;
-    entity.setCreatedTime(cTime);
-
-    // add metrics
-    Set<TimelineMetric> metrics = new HashSet<>();
-    TimelineMetric m1 = new TimelineMetric();
-    m1.setId(metric1);
-    Map<Long, Number> metricValues = new HashMap<Long, Number>();
-    long ts = insertTs;
-    metricValues.put(ts - 100000, 2L);
-    metricValues.put(ts - 80000, 40L);
-    m1.setType(Type.TIME_SERIES);
-    m1.setValues(metricValues);
-    metrics.add(m1);
-
-    TimelineMetric m2 = new TimelineMetric();
-    m2.setId(metric2);
-    metricValues = new HashMap<Long, Number>();
-    ts = insertTs;
-    metricValues.put(ts - 100000, 31L);
-    metricValues.put(ts - 80000, 57L);
-    m2.setType(Type.TIME_SERIES);
-    m2.setValues(metricValues);
-    metrics.add(m2);
-
-    entity.addMetrics(metrics);
-    TimelineEvent event = new TimelineEvent();
-    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    long endTs = 1439379885000L;
-    event.setTimestamp(endTs);
-    String expKey = "foo_event_greater";
-    String expVal = "test_app_greater";
-    event.addInfo(expKey, expVal);
-    entity.addEvent(event);
-    return entity;
-  }
-
-
-  static TimelineEntity getEntityMetricsApp2(long insertTs) {
-    TimelineEntity entity = new TimelineEntity();
-    String id = "flowRunMetrics_test";
-    String type = TimelineEntityType.YARN_APPLICATION.toString();
-    entity.setId(id);
-    entity.setType(type);
-    long cTime = 1425016501000L;
-    entity.setCreatedTime(cTime);
-    // add metrics
-    Set<TimelineMetric> metrics = new HashSet<>();
-    TimelineMetric m1 = new TimelineMetric();
-    m1.setId(metric1);
-    Map<Long, Number> metricValues = new HashMap<Long, Number>();
-    long ts = insertTs;
-    metricValues.put(ts - 100000, 5L);
-    metricValues.put(ts - 80000, 101L);
-    m1.setType(Type.TIME_SERIES);
-    m1.setValues(metricValues);
-    metrics.add(m1);
-    entity.addMetrics(metrics);
-    TimelineEvent event = new TimelineEvent();
-    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    long endTs = 1439379885000L;
-    event.setTimestamp(endTs);
-    String expKey = "foo_event_greater";
-    String expVal = "test_app_greater";
-    event.addInfo(expKey, expVal);
-    entity.addEvent(event);
-    return entity;
-  }
-
-  static TimelineEntity getEntity1() {
-    TimelineEntity entity = new TimelineEntity();
-    String id = "flowRunHello";
-    String type = TimelineEntityType.YARN_APPLICATION.toString();
-    entity.setId(id);
-    entity.setType(type);
-    long cTime = 1425026901000L;
-    entity.setCreatedTime(cTime);
-    // add metrics
-    Set<TimelineMetric> metrics = new HashSet<>();
-    TimelineMetric m1 = new TimelineMetric();
-    m1.setId(metric1);
-    Map<Long, Number> metricValues = new HashMap<Long, Number>();
-    long ts = System.currentTimeMillis();
-    metricValues.put(ts - 120000, 100000000L);
-    metricValues.put(ts - 100000, 200000000L);
-    metricValues.put(ts - 80000, 300000000L);
-    metricValues.put(ts - 60000, 400000000L);
-    metricValues.put(ts - 40000, 50000000000L);
-    metricValues.put(ts - 20000, 60000000000L);
-    m1.setType(Type.TIME_SERIES);
-    m1.setValues(metricValues);
-    metrics.add(m1);
-    entity.addMetrics(metrics);
-
-    TimelineEvent event = new TimelineEvent();
-    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    event.setTimestamp(cTime);
-    String expKey = "foo_event";
-    Object expVal = "test";
-    event.addInfo(expKey, expVal);
-    entity.addEvent(event);
-
-    event = new TimelineEvent();
-    event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
-    long expTs = cTime + 21600000;// start time + 6hrs
-    event.setTimestamp(expTs);
-    event.addInfo(expKey, expVal);
-    entity.addEvent(event);
-
-    return entity;
-  }
-
-  static TimelineEntity getAFullEntity(long ts, long endTs) {
-    TimelineEntity entity = new TimelineEntity();
-    String id = "flowRunFullEntity";
-    String type = TimelineEntityType.YARN_APPLICATION.toString();
-    entity.setId(id);
-    entity.setType(type);
-    entity.setCreatedTime(ts);
-    // add metrics
-    Set<TimelineMetric> metrics = new HashSet<>();
-    TimelineMetric m1 = new TimelineMetric();
-    m1.setId(metric1);
-    Map<Long, Number> metricValues = new HashMap<Long, Number>();
-    metricValues.put(ts - 120000, 100000000L);
-    metricValues.put(ts - 100000, 200000000L);
-    metricValues.put(ts - 80000, 300000000L);
-    metricValues.put(ts - 60000, 400000000L);
-    metricValues.put(ts - 40000, 50000000000L);
-    metricValues.put(ts - 20000, 60000000000L);
-    m1.setType(Type.TIME_SERIES);
-    m1.setValues(metricValues);
-    metrics.add(m1);
-    TimelineMetric m2 = new TimelineMetric();
-    m2.setId(metric2);
-    metricValues = new HashMap<Long, Number>();
-    metricValues.put(ts - 900000, 31L);
-    metricValues.put(ts - 30000, 57L);
-    m2.setType(Type.TIME_SERIES);
-    m2.setValues(metricValues);
-    metrics.add(m2);
-    entity.addMetrics(metrics);
-
-    TimelineEvent event = new TimelineEvent();
-    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    event.setTimestamp(ts);
-    String expKey = "foo_event";
-    Object expVal = "test";
-    event.addInfo(expKey, expVal);
-    entity.addEvent(event);
-
-    event = new TimelineEvent();
-    event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
-    long expTs = ts + 21600000;// start time + 6hrs
-    event.setTimestamp(expTs);
-    event.addInfo(expKey, expVal);
-    entity.addEvent(event);
-
-    return entity;
-  }
-
-  static TimelineEntity getEntityGreaterStartTime(long startTs) {
-    TimelineEntity entity = new TimelineEntity();
-    entity.setCreatedTime(startTs);
-    entity.setId("flowRunHello with greater start time");
-    String type = TimelineEntityType.YARN_APPLICATION.toString();
-    entity.setType(type);
-    TimelineEvent event = new TimelineEvent();
-    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    event.setTimestamp(startTs);
-    String expKey = "foo_event_greater";
-    String expVal = "test_app_greater";
-    event.addInfo(expKey, expVal);
-    entity.addEvent(event);
-    return entity;
-  }
-
-  static TimelineEntity getEntityMaxEndTime(long endTs) {
-    TimelineEntity entity = new TimelineEntity();
-    entity.setId("flowRunHello Max End time");
-    entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
-    TimelineEvent event = new TimelineEvent();
-    event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
-    event.setTimestamp(endTs);
-    String expKey = "foo_even_max_ finished";
-    String expVal = "test_app_max_finished";
-    event.addInfo(expKey, expVal);
-    entity.addEvent(event);
-    return entity;
-  }
-
-  static TimelineEntity getEntityMinStartTime(long startTs) {
-    TimelineEntity entity = new TimelineEntity();
-    String id = "flowRunHelloMInStartTime";
-    String type = TimelineEntityType.YARN_APPLICATION.toString();
-    entity.setId(id);
-    entity.setType(type);
-    entity.setCreatedTime(startTs);
-    TimelineEvent event = new TimelineEvent();
-    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    event.setTimestamp(startTs);
-    entity.addEvent(event);
-    return entity;
-  }
-
-  static TimelineEntity getMinFlushEntity(long startTs) {
-    TimelineEntity entity = new TimelineEntity();
-    String id = "flowRunHelloFlushEntityMin";
-    String type = TimelineEntityType.YARN_APPLICATION.toString();
-    entity.setId(id);
-    entity.setType(type);
-    entity.setCreatedTime(startTs);
-    TimelineEvent event = new TimelineEvent();
-    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    event.setTimestamp(startTs);
-    entity.addEvent(event);
-    return entity;
-  }
-
-  static TimelineEntity getMaxFlushEntity(long startTs) {
-    TimelineEntity entity = new TimelineEntity();
-    String id = "flowRunHelloFlushEntityMax";
-    String type = TimelineEntityType.YARN_APPLICATION.toString();
-    entity.setId(id);
-    entity.setType(type);
-    entity.setCreatedTime(startTs);
-
-    TimelineEvent event = new TimelineEvent();
-    event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
-    event.setTimestamp(startTs + END_TS_INCR);
-    entity.addEvent(event);
-    return entity;
-  }
-
-  static TimelineEntity getFlowApp1(long appCreatedTime) {
-    TimelineEntity entity = new TimelineEntity();
-    String id = "flowActivity_test";
-    String type = TimelineEntityType.YARN_APPLICATION.toString();
-    entity.setId(id);
-    entity.setType(type);
-    entity.setCreatedTime(appCreatedTime);
-
-    TimelineEvent event = new TimelineEvent();
-    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    event.setTimestamp(appCreatedTime);
-    String expKey = "foo_event";
-    Object expVal = "test";
-    event.addInfo(expKey, expVal);
-    entity.addEvent(event);
-
-    return entity;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ccdec4a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
deleted file mode 100644
index 6b23b6c..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
+++ /dev/null
@@ -1,469 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
-import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
-import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
-import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
-import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
-import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- * Tests the FlowRun and FlowActivity Tables
- */
-public class TestHBaseStorageFlowActivity {
-
-  private static HBaseTestingUtility util;
-
-  @BeforeClass
-  public static void setupBeforeClass() throws Exception {
-    util = new HBaseTestingUtility();
-    Configuration conf = util.getConfiguration();
-    conf.setInt("hfile.format.version", 3);
-    util.startMiniCluster();
-    createSchema();
-  }
-
-  private static void createSchema() throws IOException {
-    TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
-  }
-
-  /**
-   * Writes 4 timeline entities belonging to one flow run through the
-   * {@link HBaseTimelineWriterImpl}
-   *
-   * Checks the flow run table contents
-   *
-   * The first entity has a created event, metrics and a finish event.
-   *
-   * The second entity has a created event and this is the entity with smallest
-   * start time. This should be the start time for the flow run.
-   *
-   * The third entity has a finish event and this is the entity with the max end
-   * time. This should be the end time for the flow run.
-   *
-   * The fourth entity has a created event which has a start time that is
-   * greater than min start time.
-   *
-   * The test also checks in the flow activity table that one entry has been
-   * made for all of these 4 application entities since they belong to the same
-   * flow run.
-   */
-  @Test
-  public void testWriteFlowRunMinMax() throws Exception {
-
-    TimelineEntities te = new TimelineEntities();
-    te.addEntity(TestFlowDataGenerator.getEntity1());
-
-    HBaseTimelineWriterImpl hbi = null;
-    Configuration c1 = util.getConfiguration();
-    String cluster = "testWriteFlowRunMinMaxToHBase_cluster1";
-    String user = "testWriteFlowRunMinMaxToHBase_user1";
-    String flow = "testing_flowRun_flow_name";
-    String flowVersion = "CF7022C10F1354";
-    long runid = 1002345678919L;
-    String appName = "application_100000000000_1111";
-    long minStartTs = 1424995200300L;
-    long greaterStartTs = 1424995200300L + 864000L;
-    long endTs = 1424995200300L + 86000000L;;
-    TimelineEntity entityMinStartTime = TestFlowDataGenerator
-        .getEntityMinStartTime(minStartTs);
-
-    try {
-      hbi = new HBaseTimelineWriterImpl(c1);
-      hbi.init(c1);
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-
-      // write another entity with the right min start time
-      te = new TimelineEntities();
-      te.addEntity(entityMinStartTime);
-      appName = "application_100000000000_3333";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-
-      // writer another entity for max end time
-      TimelineEntity entityMaxEndTime = TestFlowDataGenerator
-          .getEntityMaxEndTime(endTs);
-      te = new TimelineEntities();
-      te.addEntity(entityMaxEndTime);
-      appName = "application_100000000000_4444";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-
-      // writer another entity with greater start time
-      TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
-          .getEntityGreaterStartTime(greaterStartTs);
-      te = new TimelineEntities();
-      te.addEntity(entityGreaterStartTime);
-      appName = "application_1000000000000000_2222";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-
-      // flush everything to hbase
-      hbi.flush();
-    } finally {
-      hbi.close();
-    }
-
-    Connection conn = ConnectionFactory.createConnection(c1);
-    // check in flow activity table
-    Table table1 = conn.getTable(TableName
-        .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
-    byte[] startRow =
-        FlowActivityRowKey.getRowKey(cluster, minStartTs, user, flow);
-    Get g = new Get(startRow);
-    Result r1 = table1.get(g);
-    assertNotNull(r1);
-    assertTrue(!r1.isEmpty());
-    Map<byte[], byte[]> values = r1.getFamilyMap(FlowActivityColumnFamily.INFO
-        .getBytes());
-    assertEquals(1, values.size());
-    byte[] row = r1.getRow();
-    FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey.parseRowKey(row);
-    assertNotNull(flowActivityRowKey);
-    assertEquals(cluster, flowActivityRowKey.getClusterId());
-    assertEquals(user, flowActivityRowKey.getUserId());
-    assertEquals(flow, flowActivityRowKey.getFlowName());
-    long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs);
-    assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
-    assertEquals(1, values.size());
-    checkFlowActivityRunId(runid, flowVersion, values);
-
-    // use the timeline reader to verify data
-    HBaseTimelineReaderImpl hbr = null;
-    try {
-      hbr = new HBaseTimelineReaderImpl();
-      hbr.init(c1);
-      hbr.start();
-      // get the flow activity entity
-      Set<TimelineEntity> entities = hbr.getEntities(
-          new TimelineReaderContext(cluster, null, null, null, null,
-          TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null),
-          new TimelineEntityFilters(10L, null, null, null, null, null,
-          null, null, null),
-          new TimelineDataToRetrieve());
-      assertEquals(1, entities.size());
-      for (TimelineEntity e : entities) {
-        FlowActivityEntity flowActivity = (FlowActivityEntity)e;
-        assertEquals(cluster, flowActivity.getCluster());
-        assertEquals(user, flowActivity.getUser());
-        assertEquals(flow, flowActivity.getFlowName());
-        assertEquals(dayTs, flowActivity.getDate().getTime());
-        Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
-        assertEquals(1, flowRuns.size());
-      }
-    } finally {
-      hbr.close();
-    }
-  }
-
-  /**
-   * Write 1 application entity and checks the record for today in the flow
-   * activity table
-   */
-  @Test
-  public void testWriteFlowActivityOneFlow() throws Exception {
-    String cluster = "testWriteFlowActivityOneFlow_cluster1";
-    String user = "testWriteFlowActivityOneFlow_user1";
-    String flow = "flow_activity_test_flow_name";
-    String flowVersion = "A122110F135BC4";
-    long runid = 1001111178919L;
-
-    TimelineEntities te = new TimelineEntities();
-    long appCreatedTime = 1425016501000L;
-    TimelineEntity entityApp1 =
-        TestFlowDataGenerator.getFlowApp1(appCreatedTime);
-    te.addEntity(entityApp1);
-
-    HBaseTimelineWriterImpl hbi = null;
-    Configuration c1 = util.getConfiguration();
-    try {
-      hbi = new HBaseTimelineWriterImpl(c1);
-      hbi.init(c1);
-      String appName = "application_1111999999_1234";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-      hbi.flush();
-    } finally {
-      hbi.close();
-    }
-    // check flow activity
-    checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1,
-        appCreatedTime);
-
-    // use the reader to verify the data
-    HBaseTimelineReaderImpl hbr = null;
-    try {
-      hbr = new HBaseTimelineReaderImpl();
-      hbr.init(c1);
-      hbr.start();
-
-      Set<TimelineEntity> entities = hbr.getEntities(
-          new TimelineReaderContext(cluster, user, flow, null, null,
-          TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null),
-          new TimelineEntityFilters(10L, null, null, null, null, null,
-          null, null, null),
-          new TimelineDataToRetrieve());
-      assertEquals(1, entities.size());
-      for (TimelineEntity e : entities) {
-        FlowActivityEntity entity = (FlowActivityEntity)e;
-        NavigableSet<FlowRunEntity> flowRuns = entity.getFlowRuns();
-        assertEquals(1, flowRuns.size());
-        for (FlowRunEntity flowRun : flowRuns) {
-          assertEquals(runid, flowRun.getRunId());
-          assertEquals(flowVersion, flowRun.getVersion());
-        }
-      }
-    } finally {
-      hbr.close();
-    }
-  }
-
-  private void checkFlowActivityTable(String cluster, String user, String flow,
-      String flowVersion, long runid, Configuration c1, long appCreatedTime)
-          throws IOException {
-    Scan s = new Scan();
-    s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
-    byte[] startRow =
-        FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow);
-    s.setStartRow(startRow);
-    String clusterStop = cluster + "1";
-    byte[] stopRow =
-        FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow);
-    s.setStopRow(stopRow);
-    Connection conn = ConnectionFactory.createConnection(c1);
-    Table table1 = conn.getTable(TableName
-        .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
-    ResultScanner scanner = table1.getScanner(s);
-    int rowCount = 0;
-    for (Result result : scanner) {
-      assertNotNull(result);
-      assertTrue(!result.isEmpty());
-      Map<byte[], byte[]> values = result
-          .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes());
-      rowCount++;
-      byte[] row = result.getRow();
-      FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey
-          .parseRowKey(row);
-      assertNotNull(flowActivityRowKey);
-      assertEquals(cluster, flowActivityRowKey.getClusterId());
-      assertEquals(user, flowActivityRowKey.getUserId());
-      assertEquals(flow, flowActivityRowKey.getFlowName());
-      long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
-      assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
-      assertEquals(1, values.size());
-      checkFlowActivityRunId(runid, flowVersion, values);
-    }
-    assertEquals(1, rowCount);
-  }
-
-  /**
-   * Writes 3 applications each with a different run id and version for the same
-   * {cluster, user, flow}
-   *
-   * They should be getting inserted into one record in the flow activity table
-   * with 3 columns, one per run id
-   */
-  @Test
-  public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException {
-    String cluster = "testManyRunsFlowActivity_cluster1";
-    String user = "testManyRunsFlowActivity_c_user1";
-    String flow = "flow_activity_test_flow_name";
-    String flowVersion1 = "A122110F135BC4";
-    long runid1 = 11111111111L;
-
-    String flowVersion2 = "A12222222222C4";
-    long runid2 = 2222222222222L;
-
-    String flowVersion3 = "A1333333333C4";
-    long runid3 = 3333333333333L;
-
-    TimelineEntities te = new TimelineEntities();
-    long appCreatedTime = 1425016501000L;
-    TimelineEntity entityApp1 =
-        TestFlowDataGenerator.getFlowApp1(appCreatedTime);
-    te.addEntity(entityApp1);
-
-    HBaseTimelineWriterImpl hbi = null;
-    Configuration c1 = util.getConfiguration();
-    try {
-      hbi = new HBaseTimelineWriterImpl(c1);
-      hbi.init(c1);
-      String appName = "application_11888888888_1111";
-      hbi.write(cluster, user, flow, flowVersion1, runid1, appName, te);
-
-      // write an application with to this flow but a different runid/ version
-      te = new TimelineEntities();
-      te.addEntity(entityApp1);
-      appName = "application_11888888888_2222";
-      hbi.write(cluster, user, flow, flowVersion2, runid2, appName, te);
-
-      // write an application with to this flow but a different runid/ version
-      te = new TimelineEntities();
-      te.addEntity(entityApp1);
-      appName = "application_11888888888_3333";
-      hbi.write(cluster, user, flow, flowVersion3, runid3, appName, te);
-
-      hbi.flush();
-    } finally {
-      hbi.close();
-    }
-    // check flow activity
-    checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1,
-        runid1, flowVersion2, runid2, flowVersion3, runid3, appCreatedTime);
-
-    // use the timeline reader to verify data
-    HBaseTimelineReaderImpl hbr = null;
-    try {
-      hbr = new HBaseTimelineReaderImpl();
-      hbr.init(c1);
-      hbr.start();
-
-      Set<TimelineEntity> entities = hbr.getEntities(
-          new TimelineReaderContext(cluster, null, null, null, null,
-          TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null),
-          new TimelineEntityFilters(10L, null, null, null, null, null,
-          null, null, null),
-          new TimelineDataToRetrieve());
-      assertEquals(1, entities.size());
-      for (TimelineEntity e : entities) {
-        FlowActivityEntity flowActivity = (FlowActivityEntity)e;
-        assertEquals(cluster, flowActivity.getCluster());
-        assertEquals(user, flowActivity.getUser());
-        assertEquals(flow, flowActivity.getFlowName());
-        long dayTs =
-            TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
-        assertEquals(dayTs, flowActivity.getDate().getTime());
-        Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
-        assertEquals(3, flowRuns.size());
-        for (FlowRunEntity flowRun : flowRuns) {
-          long runId = flowRun.getRunId();
-          String version = flowRun.getVersion();
-          if (runId == runid1) {
-            assertEquals(flowVersion1, version);
-          } else if (runId == runid2) {
-            assertEquals(flowVersion2, version);
-          } else if (runId == runid3) {
-            assertEquals(flowVersion3, version);
-          } else {
-            fail("unknown run id: " + runId);
-          }
-        }
-      }
-    } finally {
-      hbr.close();
-    }
-  }
-
-  private void checkFlowActivityTableSeveralRuns(String cluster, String user,
-      String flow, Configuration c1, String flowVersion1, long runid1,
-      String flowVersion2, long runid2, String flowVersion3, long runid3,
-      long appCreatedTime)
-      throws IOException {
-    Scan s = new Scan();
-    s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
-    byte[] startRow =
-        FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow);
-    s.setStartRow(startRow);
-    String clusterStop = cluster + "1";
-    byte[] stopRow =
-        FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow);
-    s.setStopRow(stopRow);
-    Connection conn = ConnectionFactory.createConnection(c1);
-    Table table1 = conn.getTable(TableName
-        .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
-    ResultScanner scanner = table1.getScanner(s);
-    int rowCount = 0;
-    for (Result result : scanner) {
-      assertNotNull(result);
-      assertTrue(!result.isEmpty());
-      byte[] row = result.getRow();
-      FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey
-          .parseRowKey(row);
-      assertNotNull(flowActivityRowKey);
-      assertEquals(cluster, flowActivityRowKey.getClusterId());
-      assertEquals(user, flowActivityRowKey.getUserId());
-      assertEquals(flow, flowActivityRowKey.getFlowName());
-      long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
-      assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
-
-      Map<byte[], byte[]> values = result
-          .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes());
-      rowCount++;
-      assertEquals(3, values.size());
-      checkFlowActivityRunId(runid1, flowVersion1, values);
-      checkFlowActivityRunId(runid2, flowVersion2, values);
-      checkFlowActivityRunId(runid3, flowVersion3, values);
-    }
-    // the flow activity table is such that it will insert
-    // into current day's record
-    // hence, if this test runs across the midnight boundary,
-    // it may fail since it would insert into two records
-    // one for each day
-    assertEquals(1, rowCount);
-  }
-
-  private void checkFlowActivityRunId(long runid, String flowVersion,
-      Map<byte[], byte[]> values) throws IOException {
-    byte[] rq = ColumnHelper.getColumnQualifier(
-        FlowActivityColumnPrefix.RUN_ID.getColumnPrefixBytes(),
-        GenericObjectMapper.write(runid));
-    for (Map.Entry<byte[], byte[]> k : values.entrySet()) {
-      String actualQ = Bytes.toString(k.getKey());
-      if (Bytes.toString(rq).equals(actualQ)) {
-        String actualV = (String) GenericObjectMapper.read(k.getValue());
-        assertEquals(flowVersion, actualV);
-      }
-    }
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    util.shutdownMiniCluster();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ccdec4a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
deleted file mode 100644
index 801d43c..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ /dev/null
@@ -1,851 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
-import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
-import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
-import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- * Tests the FlowRun and FlowActivity Tables
- */
-public class TestHBaseStorageFlowRun {
-
-  private static HBaseTestingUtility util;
-
-  private final String metric1 = "MAP_SLOT_MILLIS";
-  private final String metric2 = "HDFS_BYTES_READ";
-
-  @BeforeClass
-  public static void setupBeforeClass() throws Exception {
-    util = new HBaseTestingUtility();
-    Configuration conf = util.getConfiguration();
-    conf.setInt("hfile.format.version", 3);
-    util.startMiniCluster();
-    createSchema();
-  }
-
-  private static void createSchema() throws IOException {
-    TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
-  }
-
-  @Test
-  public void checkCoProcessorOff() throws IOException, InterruptedException {
-    Configuration hbaseConf = util.getConfiguration();
-    TableName table = TableName.valueOf(hbaseConf.get(
-        FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
-    Connection conn = null;
-    conn = ConnectionFactory.createConnection(hbaseConf);
-    Admin admin = conn.getAdmin();
-    if (admin == null) {
-      throw new IOException("Can't check tables since admin is null");
-    }
-    if (admin.tableExists(table)) {
-      // check the regions.
-      // check in flow run table
-      util.waitUntilAllRegionsAssigned(table);
-      HRegionServer server = util.getRSForFirstRegionInTable(table);
-      List<HRegion> regions = server.getOnlineRegions(table);
-      for (HRegion region : regions) {
-        assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
-            hbaseConf));
-      }
-    }
-
-    table = TableName.valueOf(hbaseConf.get(
-        FlowActivityTable.TABLE_NAME_CONF_NAME,
-        FlowActivityTable.DEFAULT_TABLE_NAME));
-    if (admin.tableExists(table)) {
-      // check the regions.
-      // check in flow activity table
-      util.waitUntilAllRegionsAssigned(table);
-      HRegionServer server = util.getRSForFirstRegionInTable(table);
-      List<HRegion> regions = server.getOnlineRegions(table);
-      for (HRegion region : regions) {
-        assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
-            hbaseConf));
-      }
-    }
-
-    table = TableName.valueOf(hbaseConf.get(
-        EntityTable.TABLE_NAME_CONF_NAME,
-        EntityTable.DEFAULT_TABLE_NAME));
-    if (admin.tableExists(table)) {
-      // check the regions.
-      // check in entity run table
-      util.waitUntilAllRegionsAssigned(table);
-      HRegionServer server = util.getRSForFirstRegionInTable(table);
-      List<HRegion> regions = server.getOnlineRegions(table);
-      for (HRegion region : regions) {
-        assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
-            hbaseConf));
-      }
-    }
-  }
-
-  /**
-   * Writes 4 timeline entities belonging to one flow run through the
-   * {@link HBaseTimelineWriterImpl}
-   *
-   * Checks the flow run table contents
-   *
-   * The first entity has a created event, metrics and a finish event.
-   *
-   * The second entity has a created event and this is the entity with smallest
-   * start time. This should be the start time for the flow run.
-   *
-   * The third entity has a finish event and this is the entity with the max end
-   * time. This should be the end time for the flow run.
-   *
-   * The fourth entity has a created event which has a start time that is
-   * greater than min start time.
-   *
-   */
-  @Test
-  public void testWriteFlowRunMinMax() throws Exception {
-
-    TimelineEntities te = new TimelineEntities();
-    te.addEntity(TestFlowDataGenerator.getEntity1());
-
-    HBaseTimelineWriterImpl hbi = null;
-    Configuration c1 = util.getConfiguration();
-    String cluster = "testWriteFlowRunMinMaxToHBase_cluster1";
-    String user = "testWriteFlowRunMinMaxToHBase_user1";
-    String flow = "testing_flowRun_flow_name";
-    String flowVersion = "CF7022C10F1354";
-    long runid = 1002345678919L;
-    String appName = "application_100000000000_1111";
-    long minStartTs = 1425026900000L;
-    long greaterStartTs = 30000000000000L;
-    long endTs = 1439750690000L;
-    TimelineEntity entityMinStartTime = TestFlowDataGenerator
-        .getEntityMinStartTime(minStartTs);
-
-    try {
-      hbi = new HBaseTimelineWriterImpl(c1);
-      hbi.init(c1);
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-
-      // write another entity with the right min start time
-      te = new TimelineEntities();
-      te.addEntity(entityMinStartTime);
-      appName = "application_100000000000_3333";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-
-      // writer another entity for max end time
-      TimelineEntity entityMaxEndTime = TestFlowDataGenerator
-          .getEntityMaxEndTime(endTs);
-      te = new TimelineEntities();
-      te.addEntity(entityMaxEndTime);
-      appName = "application_100000000000_4444";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-
-      // writer another entity with greater start time
-      TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
-          .getEntityGreaterStartTime(greaterStartTs);
-      te = new TimelineEntities();
-      te.addEntity(entityGreaterStartTime);
-      appName = "application_1000000000000000_2222";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-
-      // flush everything to hbase
-      hbi.flush();
-    } finally {
-      hbi.close();
-    }
-
-    Connection conn = ConnectionFactory.createConnection(c1);
-    // check in flow run table
-    Table table1 = conn.getTable(TableName
-        .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
-    // scan the table and see that we get back the right min and max
-    // timestamps
-    byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
-    Get g = new Get(startRow);
-    g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
-        FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes());
-    g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
-        FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes());
-    Result r1 = table1.get(g);
-    assertNotNull(r1);
-    assertTrue(!r1.isEmpty());
-    Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO
-        .getBytes());
-
-    assertEquals(2, r1.size());
-    long starttime = Bytes.toLong(values.get(
-        FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
-    assertEquals(minStartTs, starttime);
-    assertEquals(endTs, Bytes.toLong(values
-        .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
-
-    // use the timeline reader to verify data
-    HBaseTimelineReaderImpl hbr = null;
-    try {
-      hbr = new HBaseTimelineReaderImpl();
-      hbr.init(c1);
-      hbr.start();
-      // get the flow run entity
-      TimelineEntity entity = hbr.getEntity(
-          new TimelineReaderContext(cluster, user, flow, runid, null,
-          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
-          new TimelineDataToRetrieve());
-      assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
-      FlowRunEntity flowRun = (FlowRunEntity)entity;
-      assertEquals(minStartTs, flowRun.getStartTime());
-      assertEquals(endTs, flowRun.getMaxEndTime());
-    } finally {
-      hbr.close();
-    }
-  }
-
-  /**
-   * Writes two application entities of the same flow run. Each application has
-   * two metrics: slot millis and hdfs bytes read. Each metric has values at two
-   * timestamps.
-   *
-   * Checks the metric values of the flow in the flow run table. Flow metric
-   * values should be the sum of individual metric values that belong to the
-   * latest timestamp for that metric
-   */
-  @Test
-  public void testWriteFlowRunMetricsOneFlow() throws Exception {
-    String cluster = "testWriteFlowRunMetricsOneFlow_cluster1";
-    String user = "testWriteFlowRunMetricsOneFlow_user1";
-    String flow = "testing_flowRun_metrics_flow_name";
-    String flowVersion = "CF7022C10F1354";
-    long runid = 1002345678919L;
-
-    TimelineEntities te = new TimelineEntities();
-    TimelineEntity entityApp1 = TestFlowDataGenerator
-        .getEntityMetricsApp1(System.currentTimeMillis());
-    te.addEntity(entityApp1);
-
-    HBaseTimelineWriterImpl hbi = null;
-    Configuration c1 = util.getConfiguration();
-    try {
-      hbi = new HBaseTimelineWriterImpl(c1);
-      hbi.init(c1);
-      String appName = "application_11111111111111_1111";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-      // write another application with same metric to this flow
-      te = new TimelineEntities();
-      TimelineEntity entityApp2 = TestFlowDataGenerator
-          .getEntityMetricsApp2(System.currentTimeMillis());
-      te.addEntity(entityApp2);
-      appName = "application_11111111111111_2222";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-      hbi.flush();
-    } finally {
-      hbi.close();
-    }
-
-    // check flow run
-    checkFlowRunTable(cluster, user, flow, runid, c1);
-
-    // use the timeline reader to verify data
-    HBaseTimelineReaderImpl hbr = null;
-    try {
-      hbr = new HBaseTimelineReaderImpl();
-      hbr.init(c1);
-      hbr.start();
-      TimelineEntity entity = hbr.getEntity(
-          new TimelineReaderContext(cluster, user, flow, runid, null,
-          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
-          new TimelineDataToRetrieve());
-      assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
-      Set<TimelineMetric> metrics = entity.getMetrics();
-      assertEquals(2, metrics.size());
-      for (TimelineMetric metric : metrics) {
-        String id = metric.getId();
-        Map<Long, Number> values = metric.getValues();
-        assertEquals(1, values.size());
-        Number value = null;
-        for (Number n : values.values()) {
-          value = n;
-        }
-        switch (id) {
-        case metric1:
-          assertEquals(141L, value);
-          break;
-        case metric2:
-          assertEquals(57L, value);
-          break;
-        default:
-          fail("unrecognized metric: " + id);
-        }
-      }
-    } finally {
-      hbr.close();
-    }
-  }
-
-  private void checkFlowRunTable(String cluster, String user, String flow,
-      long runid, Configuration c1) throws IOException {
-    Scan s = new Scan();
-    s.addFamily(FlowRunColumnFamily.INFO.getBytes());
-    byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
-    s.setStartRow(startRow);
-    String clusterStop = cluster + "1";
-    byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid);
-    s.setStopRow(stopRow);
-    Connection conn = ConnectionFactory.createConnection(c1);
-    Table table1 = conn.getTable(TableName
-        .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
-    ResultScanner scanner = table1.getScanner(s);
-
-    int rowCount = 0;
-    for (Result result : scanner) {
-      assertNotNull(result);
-      assertTrue(!result.isEmpty());
-      Map<byte[], byte[]> values = result.getFamilyMap(FlowRunColumnFamily.INFO
-          .getBytes());
-      rowCount++;
-      // check metric1
-      byte[] q = ColumnHelper.getColumnQualifier(
-          FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1);
-      assertTrue(values.containsKey(q));
-      assertEquals(141L, Bytes.toLong(values.get(q)));
-
-      // check metric2
-      assertEquals(3, values.size());
-      q = ColumnHelper.getColumnQualifier(
-          FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2);
-      assertTrue(values.containsKey(q));
-      assertEquals(57L, Bytes.toLong(values.get(q)));
-    }
-    assertEquals(1, rowCount);
-  }
-
-  @Test
-  public void testWriteFlowRunMetricsPrefix() throws Exception {
-    String cluster = "testWriteFlowRunMetricsPrefix_cluster1";
-    String user = "testWriteFlowRunMetricsPrefix_user1";
-    String flow = "testWriteFlowRunMetricsPrefix_flow_name";
-    String flowVersion = "CF7022C10F1354";
-
-    TimelineEntities te = new TimelineEntities();
-    TimelineEntity entityApp1 = TestFlowDataGenerator
-        .getEntityMetricsApp1(System.currentTimeMillis());
-    te.addEntity(entityApp1);
-
-    HBaseTimelineWriterImpl hbi = null;
-    Configuration c1 = util.getConfiguration();
-    try {
-      hbi = new HBaseTimelineWriterImpl(c1);
-      hbi.init(c1);
-      String appName = "application_11111111111111_1111";
-      hbi.write(cluster, user, flow, flowVersion, 1002345678919L, appName, te);
-      // write another application with same metric to this flow
-      te = new TimelineEntities();
-      TimelineEntity entityApp2 = TestFlowDataGenerator
-          .getEntityMetricsApp2(System.currentTimeMillis());
-      te.addEntity(entityApp2);
-      appName = "application_11111111111111_2222";
-      hbi.write(cluster, user, flow, flowVersion, 1002345678918L, appName, te);
-      hbi.flush();
-    } finally {
-      hbi.close();
-    }
-
-    // use the timeline reader to verify data
-    HBaseTimelineReaderImpl hbr = null;
-    try {
-      hbr = new HBaseTimelineReaderImpl();
-      hbr.init(c1);
-      hbr.start();
-      TimelineFilterList metricsToRetrieve = new TimelineFilterList(
-          Operator.OR, new TimelinePrefixFilter(TimelineCompareOp.EQUAL,
-              metric1.substring(0, metric1.indexOf("_") + 1)));
-      TimelineEntity entity = hbr.getEntity(
-          new TimelineReaderContext(cluster, user, flow, 1002345678919L, null,
-          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
-          new TimelineDataToRetrieve(null, metricsToRetrieve, null));
-      assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
-      Set<TimelineMetric> metrics = entity.getMetrics();
-      assertEquals(1, metrics.size());
-      for (TimelineMetric metric : metrics) {
-        String id = metric.getId();
-        Map<Long, Number> values = metric.getValues();
-        assertEquals(1, values.size());
-        Number value = null;
-        for (Number n : values.values()) {
-          value = n;
-        }
-        switch (id) {
-        case metric1:
-          assertEquals(40L, value);
-          break;
-        default:
-          fail("unrecognized metric: " + id);
-        }
-      }
-
-      Set<TimelineEntity> entities = hbr.getEntities(
-          new TimelineReaderContext(cluster, user, flow, null, null,
-          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
-          new TimelineEntityFilters(),
-          new TimelineDataToRetrieve(null, metricsToRetrieve, null));
-      assertEquals(2, entities.size());
-      int metricCnt = 0;
-      for (TimelineEntity timelineEntity : entities) {
-        metricCnt += timelineEntity.getMetrics().size();
-      }
-      assertEquals(2, metricCnt);
-    } finally {
-      hbr.close();
-    }
-  }
-
-  @Test
-  public void testWriteFlowRunsMetricFields() throws Exception {
-    String cluster = "testWriteFlowRunsMetricFields_cluster1";
-    String user = "testWriteFlowRunsMetricFields_user1";
-    String flow = "testWriteFlowRunsMetricFields_flow_name";
-    String flowVersion = "CF7022C10F1354";
-    long runid = 1002345678919L;
-
-    TimelineEntities te = new TimelineEntities();
-    TimelineEntity entityApp1 = TestFlowDataGenerator
-        .getEntityMetricsApp1(System.currentTimeMillis());
-    te.addEntity(entityApp1);
-
-    HBaseTimelineWriterImpl hbi = null;
-    Configuration c1 = util.getConfiguration();
-    try {
-      hbi = new HBaseTimelineWriterImpl(c1);
-      hbi.init(c1);
-      String appName = "application_11111111111111_1111";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-      // write another application with same metric to this flow
-      te = new TimelineEntities();
-      TimelineEntity entityApp2 = TestFlowDataGenerator
-          .getEntityMetricsApp2(System.currentTimeMillis());
-      te.addEntity(entityApp2);
-      appName = "application_11111111111111_2222";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-      hbi.flush();
-    } finally {
-      hbi.close();
-    }
-
-    // check flow run
-    checkFlowRunTable(cluster, user, flow, runid, c1);
-
-    // use the timeline reader to verify data
-    HBaseTimelineReaderImpl hbr = null;
-    try {
-      hbr = new HBaseTimelineReaderImpl();
-      hbr.init(c1);
-      hbr.start();
-      Set<TimelineEntity> entities = hbr.getEntities(
-          new TimelineReaderContext(cluster, user, flow, runid, null,
-          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
-          new TimelineEntityFilters(),
-          new TimelineDataToRetrieve());
-      assertEquals(1, entities.size());
-      for (TimelineEntity timelineEntity : entities) {
-        assertEquals(0, timelineEntity.getMetrics().size());
-      }
-
-      entities = hbr.getEntities(
-          new TimelineReaderContext(cluster, user, flow, runid, null,
-          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
-          new TimelineEntityFilters(),
-          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
-      assertEquals(1, entities.size());
-      for (TimelineEntity timelineEntity : entities) {
-        Set<TimelineMetric> timelineMetrics = timelineEntity.getMetrics();
-        assertEquals(2, timelineMetrics.size());
-        for (TimelineMetric metric : timelineMetrics) {
-          String id = metric.getId();
-          Map<Long, Number> values = metric.getValues();
-          assertEquals(1, values.size());
-          Number value = null;
-          for (Number n : values.values()) {
-            value = n;
-          }
-          switch (id) {
-          case metric1:
-            assertEquals(141L, value);
-            break;
-          case metric2:
-            assertEquals(57L, value);
-            break;
-          default:
-            fail("unrecognized metric: " + id);
-          }
-        }
-      }
-    } finally {
-      hbr.close();
-    }
-  }
-
-  @Test
-  public void testWriteFlowRunFlush() throws Exception {
-    String cluster = "atestFlushFlowRun_cluster1";
-    String user = "atestFlushFlowRun__user1";
-    String flow = "atestFlushFlowRun_flow_name";
-    String flowVersion = "AF1021C19F1351";
-    long runid = 1449526652000L;
-
-    int start = 10;
-    int count = 20000;
-    int appIdSuffix = 1;
-    HBaseTimelineWriterImpl hbi = null;
-    long insertTs = 1449796654827L - count;
-    long minTS = insertTs + 1;
-    long startTs = insertTs;
-    Configuration c1 = util.getConfiguration();
-    TimelineEntities te1 = null;
-    TimelineEntity entityApp1 = null;
-    TimelineEntity entityApp2 = null;
-    try {
-      hbi = new HBaseTimelineWriterImpl(c1);
-      hbi.init(c1);
-
-      for (int i = start; i < count; i++) {
-        String appName = "application_1060350000000_" + appIdSuffix;
-        insertTs++;
-        te1 = new TimelineEntities();
-        entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs);
-        te1.addEntity(entityApp1);
-        entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs);
-        te1.addEntity(entityApp2);
-        hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
-        Thread.sleep(1);
-
-        appName = "application_1001199480000_7" + appIdSuffix;
-        insertTs++;
-        appIdSuffix++;
-        te1 = new TimelineEntities();
-        entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs);
-        te1.addEntity(entityApp1);
-        entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs);
-        te1.addEntity(entityApp2);
-
-        hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
-        if (i % 1000 == 0) {
-          hbi.flush();
-          checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow,
-              runid, false);
-        }
-      }
-    } finally {
-      hbi.flush();
-      hbi.close();
-      checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, runid,
-          true);
-    }
-  }
-
-  private void checkMinMaxFlush(Configuration c1, long minTS, long startTs,
-      int count, String cluster, String user, String flow, long runid,
-      boolean checkMax) throws IOException {
-    Connection conn = ConnectionFactory.createConnection(c1);
-    // check in flow run table
-    Table table1 = conn.getTable(TableName
-        .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
-    // scan the table and see that we get back the right min and max
-    // timestamps
-    byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
-    Get g = new Get(startRow);
-    g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
-        FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes());
-    g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
-        FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes());
-
-    Result r1 = table1.get(g);
-    assertNotNull(r1);
-    assertTrue(!r1.isEmpty());
-    Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO
-        .getBytes());
-    int start = 10;
-    assertEquals(2, r1.size());
-    long starttime = Bytes.toLong(values
-        .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
-    assertEquals(minTS, starttime);
-    if (checkMax) {
-      assertEquals(startTs + 2 * (count - start)
-          + TestFlowDataGenerator.END_TS_INCR,
-          Bytes.toLong(values
-          .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
-    }
-  }
-
-  @Test
-  public void testFilterFlowRunsByCreatedTime() throws Exception {
-    String cluster = "cluster2";
-    String user = "user2";
-    String flow = "flow_name2";
-
-    TimelineEntities te = new TimelineEntities();
-    TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(
-        System.currentTimeMillis());
-    entityApp1.setCreatedTime(1425016501000L);
-    te.addEntity(entityApp1);
-
-    HBaseTimelineWriterImpl hbi = null;
-    Configuration c1 = util.getConfiguration();
-    try {
-      hbi = new HBaseTimelineWriterImpl(c1);
-      hbi.init(c1);
-      hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L,
-          "application_11111111111111_1111", te);
-      // write another application with same metric to this flow
-      te = new TimelineEntities();
-      TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(
-          System.currentTimeMillis());
-      entityApp2.setCreatedTime(1425016502000L);
-      te.addEntity(entityApp2);
-      hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L,
-          "application_11111111111111_2222", te);
-      hbi.flush();
-    } finally {
-      hbi.close();
-    }
-
-    // use the timeline reader to verify data
-    HBaseTimelineReaderImpl hbr = null;
-    try {
-      hbr = new HBaseTimelineReaderImpl();
-      hbr.init(c1);
-      hbr.start();
-
-      Set<TimelineEntity> entities = hbr.getEntities(
-          new TimelineReaderContext(cluster, user, flow,
-          null, null, TimelineEntityType.YARN_FLOW_RUN.toString(), null),
-          new TimelineEntityFilters(null, 1425016501000L, 1425016502001L, null,
-          null, null, null, null, null), new TimelineDataToRetrieve());
-      assertEquals(2, entities.size());
-      for (TimelineEntity entity : entities) {
-        if (!entity.getId().equals("user2@flow_name2/1002345678918") &&
-            !entity.getId().equals("user2@flow_name2/1002345678919")) {
-          fail("Entities with flow runs 1002345678918 and 1002345678919" +
-              "should be present.");
-        }
-      }
-      entities = hbr.getEntities(
-          new TimelineReaderContext(cluster, user, flow, null, null,
-          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
-          new TimelineEntityFilters(null, 1425016501050L, null, null, null,
-          null, null, null, null), new TimelineDataToRetrieve());
-      assertEquals(1, entities.size());
-      for (TimelineEntity entity : entities) {
-        if (!entity.getId().equals("user2@flow_name2/1002345678918")) {
-          fail("Entity with flow run 1002345678918 should be present.");
-        }
-      }
-      entities = hbr.getEntities(
-          new TimelineReaderContext(cluster, user, flow, null, null,
-          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
-          new TimelineEntityFilters(null, null, 1425016501050L, null, null,
-          null, null, null, null), new TimelineDataToRetrieve());
-      assertEquals(1, entities.size());
-      for (TimelineEntity entity : entities) {
-        if (!entity.getId().equals("user2@flow_name2/1002345678919")) {
-          fail("Entity with flow run 1002345678919 should be present.");
-        }
-      }
-    } finally {
-      hbr.close();
-    }
-  }
-
-  @Test
-  public void testMetricFilters() throws Exception {
-    String cluster = "cluster1";
-    String user = "user1";
-    String flow = "flow_name1";
-
-    TimelineEntities te = new TimelineEntities();
-    TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(
-        System.currentTimeMillis());
-    te.addEntity(entityApp1);
-
-    HBaseTimelineWriterImpl hbi = null;
-    Configuration c1 = util.getConfiguration();
-    try {
-      hbi = new HBaseTimelineWriterImpl(c1);
-      hbi.init(c1);
-      hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L,
-          "application_11111111111111_1111", te);
-      // write another application with same metric to this flow
-      te = new TimelineEntities();
-      TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(
-          System.currentTimeMillis());
-      te.addEntity(entityApp2);
-      hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L,
-          "application_11111111111111_2222", te);
-      hbi.flush();
-    } finally {
-      hbi.close();
-    }
-
-    // use the timeline reader to verify data
-    HBaseTimelineReaderImpl hbr = null;
-    try {
-      hbr = new HBaseTimelineReaderImpl();
-      hbr.init(c1);
-      hbr.start();
-
-      TimelineFilterList list1 = new TimelineFilterList();
-      list1.addFilter(new TimelineCompareFilter(
-          TimelineCompareOp.GREATER_OR_EQUAL, metric1, 101));
-      TimelineFilterList list2 = new TimelineFilterList();
-      list2.addFilter(new TimelineCompareFilter(
-          TimelineCompareOp.LESS_THAN, metric1, 43));
-      list2.addFilter(new TimelineCompareFilter(
-          TimelineCompareOp.EQUAL, metric2, 57));
-      TimelineFilterList metricFilterList =
-          new TimelineFilterList(Operator.OR, list1, list2);
-      Set<TimelineEntity> entities = hbr.getEntities(
-          new TimelineReaderContext(cluster, user, flow, null,
-          null, TimelineEntityType.YARN_FLOW_RUN.toString(), null),
-          new TimelineEntityFilters(null, null, null, null, null, null, null,
-          metricFilterList, null),
-          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
-      assertEquals(2, entities.size());
-      int metricCnt = 0;
-      for (TimelineEntity entity : entities) {
-        metricCnt += entity.getMetrics().size();
-      }
-      assertEquals(3, metricCnt);
-
-      TimelineFilterList metricFilterList1 = new TimelineFilterList(
-          new TimelineCompareFilter(
-          TimelineCompareOp.LESS_OR_EQUAL, metric1, 127),
-          new TimelineCompareFilter(TimelineCompareOp.NOT_EQUAL, metric2, 30));
-      entities = hbr.getEntities(
-          new TimelineReaderContext(cluster, user, flow, null, null,
-          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
-          new TimelineEntityFilters(null, null, null, null, null, null, null,
-          metricFilterList1, null),
-          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
-      assertEquals(1, entities.size());
-      metricCnt = 0;
-      for (TimelineEntity entity : entities) {
-        metricCnt += entity.getMetrics().size();
-      }
-      assertEquals(2, metricCnt);
-
-      TimelineFilterList metricFilterList2 = new TimelineFilterList(
-          new TimelineCompareFilter(TimelineCompareOp.LESS_THAN, metric1, 32),
-          new TimelineCompareFilter(TimelineCompareOp.NOT_EQUAL, metric2, 57));
-      entities = hbr.getEntities(
-          new TimelineReaderContext(cluster, user, flow, null, null,
-          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
-          new TimelineEntityFilters(null, null, null, null, null, null, null,
-          metricFilterList2, null),
-          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
-      assertEquals(0, entities.size());
-
-      TimelineFilterList metricFilterList3 = new TimelineFilterList(
-          new TimelineCompareFilter(TimelineCompareOp.EQUAL, "s_metric", 32));
-      entities = hbr.getEntities(
-          new TimelineReaderContext(cluster, user, flow, null, null,
-          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
-          new TimelineEntityFilters(null, null, null, null, null, null, null,
-          metricFilterList3, null),
-          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
-      assertEquals(0, entities.size());
-
-      TimelineFilterList list3 = new TimelineFilterList();
-      list3.addFilter(new TimelineCompareFilter(
-          TimelineCompareOp.GREATER_OR_EQUAL, metric1, 101));
-      TimelineFilterList list4 = new TimelineFilterList();
-      list4.addFilter(new TimelineCompareFilter(
-          TimelineCompareOp.LESS_THAN, metric1, 43));
-      list4.addFilter(new TimelineCompareFilter(
-          TimelineCompareOp.EQUAL, metric2, 57));
-      TimelineFilterList metricFilterList4 =
-          new TimelineFilterList(Operator.OR, list3, list4);
-      TimelineFilterList metricsToRetrieve = new TimelineFilterList(Operator.OR,
-          new TimelinePrefixFilter(TimelineCompareOp.EQUAL,
-          metric2.substring(0, metric2.indexOf("_") + 1)));
-      entities = hbr.getEntities(
-          new TimelineReaderContext(cluster, user, flow, null, null,
-          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
-          new TimelineEntityFilters(null, null, null, null, null, null, null,
-          metricFilterList4, null),
-          new TimelineDataToRetrieve(null, metricsToRetrieve,
-          EnumSet.of(Field.ALL)));
-      assertEquals(2, entities.size());
-      metricCnt = 0;
-      for (TimelineEntity entity : entities) {
-        metricCnt += entity.getMetrics().size();
-      }
-      assertEquals(1, metricCnt);
-    } finally {
-      hbr.close();
-    }
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    util.shutdownMiniCluster();
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org