You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2016/06/21 23:49:03 UTC
[17/50] [abbrv] hadoop git commit: YARN-5045. hbase unit tests fail
due to dependency issues. (Sangjin Lee via varunsaxena)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ccdec4a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java
deleted file mode 100644
index 58d5e61..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.hadoop.hbase.IntegrationTestingUtility;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
-import org.apache.phoenix.query.BaseTest;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.ReadOnlyProps;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-
-public class TestPhoenixOfflineAggregationWriterImpl extends BaseTest {
- private static PhoenixOfflineAggregationWriterImpl storage;
- private static final int BATCH_SIZE = 3;
-
- @BeforeClass
- public static void setup() throws Exception {
- YarnConfiguration conf = new YarnConfiguration();
- storage = setupPhoenixClusterAndWriterForTest(conf);
- }
-
- @Test(timeout = 90000)
- public void testFlowLevelAggregationStorage() throws Exception {
- testAggregator(OfflineAggregationInfo.FLOW_AGGREGATION);
- }
-
- @Test(timeout = 90000)
- public void testUserLevelAggregationStorage() throws Exception {
- testAggregator(OfflineAggregationInfo.USER_AGGREGATION);
- }
-
- @AfterClass
- public static void cleanup() throws Exception {
- storage.dropTable(OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME);
- storage.dropTable(OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME);
- tearDownMiniCluster();
- }
-
- private static PhoenixOfflineAggregationWriterImpl
- setupPhoenixClusterAndWriterForTest(YarnConfiguration conf)
- throws Exception{
- Map<String, String> props = new HashMap<>();
- // Must update config before starting server
- props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
- Boolean.FALSE.toString());
- props.put("java.security.krb5.realm", "");
- props.put("java.security.krb5.kdc", "");
- props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER,
- Boolean.FALSE.toString());
- props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000));
- props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100));
- // Make a small batch size to test multiple calls to reserve sequences
- props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB,
- Long.toString(BATCH_SIZE));
- // Must update config before starting server
- setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-
- // Change connection settings for test
- conf.set(
- YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR,
- getUrl());
- PhoenixOfflineAggregationWriterImpl
- myWriter = new PhoenixOfflineAggregationWriterImpl(TEST_PROPERTIES);
- myWriter.init(conf);
- myWriter.start();
- myWriter.createPhoenixTables();
- return myWriter;
- }
-
- private static TimelineEntity getTestAggregationTimelineEntity() {
- TimelineEntity entity = new TimelineEntity();
- String id = "hello1";
- String type = "testAggregationType";
- entity.setId(id);
- entity.setType(type);
- entity.setCreatedTime(1425016501000L);
-
- TimelineMetric metric = new TimelineMetric();
- metric.setId("HDFS_BYTES_READ");
- metric.addValue(1425016501100L, 8000);
- entity.addMetric(metric);
-
- return entity;
- }
-
- private void testAggregator(OfflineAggregationInfo aggregationInfo)
- throws Exception {
- // Set up a list of timeline entities and write them back to Phoenix
- int numEntity = 1;
- TimelineEntities te = new TimelineEntities();
- te.addEntity(getTestAggregationTimelineEntity());
- TimelineCollectorContext context = new TimelineCollectorContext("cluster_1",
- "user1", "testFlow", null, 0L, null);
- storage.writeAggregatedEntity(context, te,
- aggregationInfo);
-
- // Verify if we're storing all entities
- String[] primaryKeyList = aggregationInfo.getPrimaryKeyList();
- String sql = "SELECT COUNT(" + primaryKeyList[primaryKeyList.length - 1]
- +") FROM " + aggregationInfo.getTableName();
- verifySQLWithCount(sql, numEntity, "Number of entities should be ");
- // Check metric
- sql = "SELECT COUNT(m.HDFS_BYTES_READ) FROM "
- + aggregationInfo.getTableName() + "(m.HDFS_BYTES_READ VARBINARY) ";
- verifySQLWithCount(sql, numEntity,
- "Number of entities with info should be ");
- }
-
-
- private void verifySQLWithCount(String sql, int targetCount, String message)
- throws Exception {
- try (
- Statement stmt =
- storage.getConnection().createStatement();
- ResultSet rs = stmt.executeQuery(sql)) {
- assertTrue("Result set empty on statement " + sql, rs.next());
- assertNotNull("Fail to execute query " + sql, rs);
- assertEquals(message + " " + targetCount, targetCount, rs.getInt(1));
- } catch (SQLException se) {
- fail("SQL exception on query: " + sql
- + " With exception message: " + se.getLocalizedMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ccdec4a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
deleted file mode 100644
index 3b8036d..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
+++ /dev/null
@@ -1,383 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Generates the data/entities for the FlowRun and FlowActivity Tables
- */
-class TestFlowDataGenerator {
-
- private static final String metric1 = "MAP_SLOT_MILLIS";
- private static final String metric2 = "HDFS_BYTES_READ";
- public static final long END_TS_INCR = 10000L;
-
- static TimelineEntity getEntityMetricsApp1(long insertTs, Configuration c1) {
- TimelineEntity entity = new TimelineEntity();
- String id = "flowRunMetrics_test";
- String type = TimelineEntityType.YARN_APPLICATION.toString();
- entity.setId(id);
- entity.setType(type);
- long cTime = 1425016501000L;
- entity.setCreatedTime(cTime);
-
- // add metrics
- Set<TimelineMetric> metrics = new HashSet<>();
- TimelineMetric m1 = new TimelineMetric();
- m1.setId(metric1);
- Map<Long, Number> metricValues = new HashMap<Long, Number>();
- long ts = insertTs;
-
- for (int k=1; k< 100 ; k++) {
- metricValues.put(ts - k*200000, 20L);
- }
- metricValues.put(ts - 80000, 40L);
- m1.setType(Type.TIME_SERIES);
- m1.setValues(metricValues);
- metrics.add(m1);
-
- TimelineMetric m2 = new TimelineMetric();
- m2.setId(metric2);
- metricValues = new HashMap<Long, Number>();
- ts = System.currentTimeMillis();
- for (int k=1; k< 100 ; k++) {
- metricValues.put(ts - k*100000, 31L);
- }
-
- metricValues.put(ts - 80000, 57L);
- m2.setType(Type.TIME_SERIES);
- m2.setValues(metricValues);
- metrics.add(m2);
-
- entity.addMetrics(metrics);
- return entity;
- }
-
-
- static TimelineEntity getEntityMetricsApp1Complete(long insertTs, Configuration c1) {
- TimelineEntity entity = new TimelineEntity();
- String id = "flowRunMetrics_test";
- String type = TimelineEntityType.YARN_APPLICATION.toString();
- entity.setId(id);
- entity.setType(type);
- long cTime = 1425016501000L;
- entity.setCreatedTime(cTime);
-
- // add metrics
- Set<TimelineMetric> metrics = new HashSet<>();
- TimelineMetric m1 = new TimelineMetric();
- m1.setId(metric1);
- Map<Long, Number> metricValues = new HashMap<Long, Number>();
- long ts = insertTs;
-
- metricValues.put(ts - 80000, 40L);
- m1.setType(Type.TIME_SERIES);
- m1.setValues(metricValues);
- metrics.add(m1);
-
- TimelineMetric m2 = new TimelineMetric();
- m2.setId(metric2);
- metricValues = new HashMap<Long, Number>();
- ts = insertTs;
- metricValues.put(ts - 80000, 57L);
- m2.setType(Type.TIME_SERIES);
- m2.setValues(metricValues);
- metrics.add(m2);
-
- entity.addMetrics(metrics);
-
- TimelineEvent event = new TimelineEvent();
- event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
- event.setTimestamp(insertTs);
- event.addInfo("done", "insertTs=" + insertTs);
- entity.addEvent(event);
- return entity;
- }
-
-
- static TimelineEntity getEntityMetricsApp1(long insertTs) {
- TimelineEntity entity = new TimelineEntity();
- String id = "flowRunMetrics_test";
- String type = TimelineEntityType.YARN_APPLICATION.toString();
- entity.setId(id);
- entity.setType(type);
- long cTime = 1425016501000L;
- entity.setCreatedTime(cTime);
-
- // add metrics
- Set<TimelineMetric> metrics = new HashSet<>();
- TimelineMetric m1 = new TimelineMetric();
- m1.setId(metric1);
- Map<Long, Number> metricValues = new HashMap<Long, Number>();
- long ts = insertTs;
- metricValues.put(ts - 100000, 2L);
- metricValues.put(ts - 80000, 40L);
- m1.setType(Type.TIME_SERIES);
- m1.setValues(metricValues);
- metrics.add(m1);
-
- TimelineMetric m2 = new TimelineMetric();
- m2.setId(metric2);
- metricValues = new HashMap<Long, Number>();
- ts = insertTs;
- metricValues.put(ts - 100000, 31L);
- metricValues.put(ts - 80000, 57L);
- m2.setType(Type.TIME_SERIES);
- m2.setValues(metricValues);
- metrics.add(m2);
-
- entity.addMetrics(metrics);
- TimelineEvent event = new TimelineEvent();
- event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
- long endTs = 1439379885000L;
- event.setTimestamp(endTs);
- String expKey = "foo_event_greater";
- String expVal = "test_app_greater";
- event.addInfo(expKey, expVal);
- entity.addEvent(event);
- return entity;
- }
-
-
- static TimelineEntity getEntityMetricsApp2(long insertTs) {
- TimelineEntity entity = new TimelineEntity();
- String id = "flowRunMetrics_test";
- String type = TimelineEntityType.YARN_APPLICATION.toString();
- entity.setId(id);
- entity.setType(type);
- long cTime = 1425016501000L;
- entity.setCreatedTime(cTime);
- // add metrics
- Set<TimelineMetric> metrics = new HashSet<>();
- TimelineMetric m1 = new TimelineMetric();
- m1.setId(metric1);
- Map<Long, Number> metricValues = new HashMap<Long, Number>();
- long ts = insertTs;
- metricValues.put(ts - 100000, 5L);
- metricValues.put(ts - 80000, 101L);
- m1.setType(Type.TIME_SERIES);
- m1.setValues(metricValues);
- metrics.add(m1);
- entity.addMetrics(metrics);
- TimelineEvent event = new TimelineEvent();
- event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
- long endTs = 1439379885000L;
- event.setTimestamp(endTs);
- String expKey = "foo_event_greater";
- String expVal = "test_app_greater";
- event.addInfo(expKey, expVal);
- entity.addEvent(event);
- return entity;
- }
-
- static TimelineEntity getEntity1() {
- TimelineEntity entity = new TimelineEntity();
- String id = "flowRunHello";
- String type = TimelineEntityType.YARN_APPLICATION.toString();
- entity.setId(id);
- entity.setType(type);
- long cTime = 1425026901000L;
- entity.setCreatedTime(cTime);
- // add metrics
- Set<TimelineMetric> metrics = new HashSet<>();
- TimelineMetric m1 = new TimelineMetric();
- m1.setId(metric1);
- Map<Long, Number> metricValues = new HashMap<Long, Number>();
- long ts = System.currentTimeMillis();
- metricValues.put(ts - 120000, 100000000L);
- metricValues.put(ts - 100000, 200000000L);
- metricValues.put(ts - 80000, 300000000L);
- metricValues.put(ts - 60000, 400000000L);
- metricValues.put(ts - 40000, 50000000000L);
- metricValues.put(ts - 20000, 60000000000L);
- m1.setType(Type.TIME_SERIES);
- m1.setValues(metricValues);
- metrics.add(m1);
- entity.addMetrics(metrics);
-
- TimelineEvent event = new TimelineEvent();
- event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
- event.setTimestamp(cTime);
- String expKey = "foo_event";
- Object expVal = "test";
- event.addInfo(expKey, expVal);
- entity.addEvent(event);
-
- event = new TimelineEvent();
- event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
- long expTs = cTime + 21600000;// start time + 6hrs
- event.setTimestamp(expTs);
- event.addInfo(expKey, expVal);
- entity.addEvent(event);
-
- return entity;
- }
-
- static TimelineEntity getAFullEntity(long ts, long endTs) {
- TimelineEntity entity = new TimelineEntity();
- String id = "flowRunFullEntity";
- String type = TimelineEntityType.YARN_APPLICATION.toString();
- entity.setId(id);
- entity.setType(type);
- entity.setCreatedTime(ts);
- // add metrics
- Set<TimelineMetric> metrics = new HashSet<>();
- TimelineMetric m1 = new TimelineMetric();
- m1.setId(metric1);
- Map<Long, Number> metricValues = new HashMap<Long, Number>();
- metricValues.put(ts - 120000, 100000000L);
- metricValues.put(ts - 100000, 200000000L);
- metricValues.put(ts - 80000, 300000000L);
- metricValues.put(ts - 60000, 400000000L);
- metricValues.put(ts - 40000, 50000000000L);
- metricValues.put(ts - 20000, 60000000000L);
- m1.setType(Type.TIME_SERIES);
- m1.setValues(metricValues);
- metrics.add(m1);
- TimelineMetric m2 = new TimelineMetric();
- m2.setId(metric2);
- metricValues = new HashMap<Long, Number>();
- metricValues.put(ts - 900000, 31L);
- metricValues.put(ts - 30000, 57L);
- m2.setType(Type.TIME_SERIES);
- m2.setValues(metricValues);
- metrics.add(m2);
- entity.addMetrics(metrics);
-
- TimelineEvent event = new TimelineEvent();
- event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
- event.setTimestamp(ts);
- String expKey = "foo_event";
- Object expVal = "test";
- event.addInfo(expKey, expVal);
- entity.addEvent(event);
-
- event = new TimelineEvent();
- event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
- long expTs = ts + 21600000;// start time + 6hrs
- event.setTimestamp(expTs);
- event.addInfo(expKey, expVal);
- entity.addEvent(event);
-
- return entity;
- }
-
- static TimelineEntity getEntityGreaterStartTime(long startTs) {
- TimelineEntity entity = new TimelineEntity();
- entity.setCreatedTime(startTs);
- entity.setId("flowRunHello with greater start time");
- String type = TimelineEntityType.YARN_APPLICATION.toString();
- entity.setType(type);
- TimelineEvent event = new TimelineEvent();
- event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
- event.setTimestamp(startTs);
- String expKey = "foo_event_greater";
- String expVal = "test_app_greater";
- event.addInfo(expKey, expVal);
- entity.addEvent(event);
- return entity;
- }
-
- static TimelineEntity getEntityMaxEndTime(long endTs) {
- TimelineEntity entity = new TimelineEntity();
- entity.setId("flowRunHello Max End time");
- entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
- TimelineEvent event = new TimelineEvent();
- event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
- event.setTimestamp(endTs);
- String expKey = "foo_even_max_ finished";
- String expVal = "test_app_max_finished";
- event.addInfo(expKey, expVal);
- entity.addEvent(event);
- return entity;
- }
-
- static TimelineEntity getEntityMinStartTime(long startTs) {
- TimelineEntity entity = new TimelineEntity();
- String id = "flowRunHelloMInStartTime";
- String type = TimelineEntityType.YARN_APPLICATION.toString();
- entity.setId(id);
- entity.setType(type);
- entity.setCreatedTime(startTs);
- TimelineEvent event = new TimelineEvent();
- event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
- event.setTimestamp(startTs);
- entity.addEvent(event);
- return entity;
- }
-
- static TimelineEntity getMinFlushEntity(long startTs) {
- TimelineEntity entity = new TimelineEntity();
- String id = "flowRunHelloFlushEntityMin";
- String type = TimelineEntityType.YARN_APPLICATION.toString();
- entity.setId(id);
- entity.setType(type);
- entity.setCreatedTime(startTs);
- TimelineEvent event = new TimelineEvent();
- event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
- event.setTimestamp(startTs);
- entity.addEvent(event);
- return entity;
- }
-
- static TimelineEntity getMaxFlushEntity(long startTs) {
- TimelineEntity entity = new TimelineEntity();
- String id = "flowRunHelloFlushEntityMax";
- String type = TimelineEntityType.YARN_APPLICATION.toString();
- entity.setId(id);
- entity.setType(type);
- entity.setCreatedTime(startTs);
-
- TimelineEvent event = new TimelineEvent();
- event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
- event.setTimestamp(startTs + END_TS_INCR);
- entity.addEvent(event);
- return entity;
- }
-
- static TimelineEntity getFlowApp1(long appCreatedTime) {
- TimelineEntity entity = new TimelineEntity();
- String id = "flowActivity_test";
- String type = TimelineEntityType.YARN_APPLICATION.toString();
- entity.setId(id);
- entity.setType(type);
- entity.setCreatedTime(appCreatedTime);
-
- TimelineEvent event = new TimelineEvent();
- event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
- event.setTimestamp(appCreatedTime);
- String expKey = "foo_event";
- Object expVal = "test";
- event.addInfo(expKey, expVal);
- entity.addEvent(event);
-
- return entity;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ccdec4a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
deleted file mode 100644
index 6b23b6c..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
+++ /dev/null
@@ -1,469 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
-import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
-import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
-import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
-import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
-import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- * Tests the FlowRun and FlowActivity Tables
- */
-public class TestHBaseStorageFlowActivity {
-
- private static HBaseTestingUtility util;
-
- @BeforeClass
- public static void setupBeforeClass() throws Exception {
- util = new HBaseTestingUtility();
- Configuration conf = util.getConfiguration();
- conf.setInt("hfile.format.version", 3);
- util.startMiniCluster();
- createSchema();
- }
-
- private static void createSchema() throws IOException {
- TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
- }
-
- /**
- * Writes 4 timeline entities belonging to one flow run through the
- * {@link HBaseTimelineWriterImpl}
- *
- * Checks the flow run table contents
- *
- * The first entity has a created event, metrics and a finish event.
- *
- * The second entity has a created event and this is the entity with smallest
- * start time. This should be the start time for the flow run.
- *
- * The third entity has a finish event and this is the entity with the max end
- * time. This should be the end time for the flow run.
- *
- * The fourth entity has a created event which has a start time that is
- * greater than min start time.
- *
- * The test also checks in the flow activity table that one entry has been
- * made for all of these 4 application entities since they belong to the same
- * flow run.
- */
- @Test
- public void testWriteFlowRunMinMax() throws Exception {
-
- TimelineEntities te = new TimelineEntities();
- te.addEntity(TestFlowDataGenerator.getEntity1());
-
- HBaseTimelineWriterImpl hbi = null;
- Configuration c1 = util.getConfiguration();
- String cluster = "testWriteFlowRunMinMaxToHBase_cluster1";
- String user = "testWriteFlowRunMinMaxToHBase_user1";
- String flow = "testing_flowRun_flow_name";
- String flowVersion = "CF7022C10F1354";
- long runid = 1002345678919L;
- String appName = "application_100000000000_1111";
- long minStartTs = 1424995200300L;
- long greaterStartTs = 1424995200300L + 864000L;
- long endTs = 1424995200300L + 86000000L;;
- TimelineEntity entityMinStartTime = TestFlowDataGenerator
- .getEntityMinStartTime(minStartTs);
-
- try {
- hbi = new HBaseTimelineWriterImpl(c1);
- hbi.init(c1);
- hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-
- // write another entity with the right min start time
- te = new TimelineEntities();
- te.addEntity(entityMinStartTime);
- appName = "application_100000000000_3333";
- hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-
- // writer another entity for max end time
- TimelineEntity entityMaxEndTime = TestFlowDataGenerator
- .getEntityMaxEndTime(endTs);
- te = new TimelineEntities();
- te.addEntity(entityMaxEndTime);
- appName = "application_100000000000_4444";
- hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-
- // writer another entity with greater start time
- TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
- .getEntityGreaterStartTime(greaterStartTs);
- te = new TimelineEntities();
- te.addEntity(entityGreaterStartTime);
- appName = "application_1000000000000000_2222";
- hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-
- // flush everything to hbase
- hbi.flush();
- } finally {
- hbi.close();
- }
-
- Connection conn = ConnectionFactory.createConnection(c1);
- // check in flow activity table
- Table table1 = conn.getTable(TableName
- .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
- byte[] startRow =
- FlowActivityRowKey.getRowKey(cluster, minStartTs, user, flow);
- Get g = new Get(startRow);
- Result r1 = table1.get(g);
- assertNotNull(r1);
- assertTrue(!r1.isEmpty());
- Map<byte[], byte[]> values = r1.getFamilyMap(FlowActivityColumnFamily.INFO
- .getBytes());
- assertEquals(1, values.size());
- byte[] row = r1.getRow();
- FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey.parseRowKey(row);
- assertNotNull(flowActivityRowKey);
- assertEquals(cluster, flowActivityRowKey.getClusterId());
- assertEquals(user, flowActivityRowKey.getUserId());
- assertEquals(flow, flowActivityRowKey.getFlowName());
- long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs);
- assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
- assertEquals(1, values.size());
- checkFlowActivityRunId(runid, flowVersion, values);
-
- // use the timeline reader to verify data
- HBaseTimelineReaderImpl hbr = null;
- try {
- hbr = new HBaseTimelineReaderImpl();
- hbr.init(c1);
- hbr.start();
- // get the flow activity entity
- Set<TimelineEntity> entities = hbr.getEntities(
- new TimelineReaderContext(cluster, null, null, null, null,
- TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null),
- new TimelineEntityFilters(10L, null, null, null, null, null,
- null, null, null),
- new TimelineDataToRetrieve());
- assertEquals(1, entities.size());
- for (TimelineEntity e : entities) {
- FlowActivityEntity flowActivity = (FlowActivityEntity)e;
- assertEquals(cluster, flowActivity.getCluster());
- assertEquals(user, flowActivity.getUser());
- assertEquals(flow, flowActivity.getFlowName());
- assertEquals(dayTs, flowActivity.getDate().getTime());
- Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
- assertEquals(1, flowRuns.size());
- }
- } finally {
- hbr.close();
- }
- }
-
- /**
- * Write 1 application entity and checks the record for today in the flow
- * activity table
- */
- @Test
- public void testWriteFlowActivityOneFlow() throws Exception {
- String cluster = "testWriteFlowActivityOneFlow_cluster1";
- String user = "testWriteFlowActivityOneFlow_user1";
- String flow = "flow_activity_test_flow_name";
- String flowVersion = "A122110F135BC4";
- long runid = 1001111178919L;
-
- TimelineEntities te = new TimelineEntities();
- long appCreatedTime = 1425016501000L;
- TimelineEntity entityApp1 =
- TestFlowDataGenerator.getFlowApp1(appCreatedTime);
- te.addEntity(entityApp1);
-
- HBaseTimelineWriterImpl hbi = null;
- Configuration c1 = util.getConfiguration();
- try {
- hbi = new HBaseTimelineWriterImpl(c1);
- hbi.init(c1);
- String appName = "application_1111999999_1234";
- hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
- hbi.flush();
- } finally {
- hbi.close();
- }
- // check flow activity
- checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1,
- appCreatedTime);
-
- // use the reader to verify the data
- HBaseTimelineReaderImpl hbr = null;
- try {
- hbr = new HBaseTimelineReaderImpl();
- hbr.init(c1);
- hbr.start();
-
- Set<TimelineEntity> entities = hbr.getEntities(
- new TimelineReaderContext(cluster, user, flow, null, null,
- TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null),
- new TimelineEntityFilters(10L, null, null, null, null, null,
- null, null, null),
- new TimelineDataToRetrieve());
- assertEquals(1, entities.size());
- for (TimelineEntity e : entities) {
- FlowActivityEntity entity = (FlowActivityEntity)e;
- NavigableSet<FlowRunEntity> flowRuns = entity.getFlowRuns();
- assertEquals(1, flowRuns.size());
- for (FlowRunEntity flowRun : flowRuns) {
- assertEquals(runid, flowRun.getRunId());
- assertEquals(flowVersion, flowRun.getVersion());
- }
- }
- } finally {
- hbr.close();
- }
- }
-
- private void checkFlowActivityTable(String cluster, String user, String flow,
- String flowVersion, long runid, Configuration c1, long appCreatedTime)
- throws IOException {
- Scan s = new Scan();
- s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
- byte[] startRow =
- FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow);
- s.setStartRow(startRow);
- String clusterStop = cluster + "1";
- byte[] stopRow =
- FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow);
- s.setStopRow(stopRow);
- Connection conn = ConnectionFactory.createConnection(c1);
- Table table1 = conn.getTable(TableName
- .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
- ResultScanner scanner = table1.getScanner(s);
- int rowCount = 0;
- for (Result result : scanner) {
- assertNotNull(result);
- assertTrue(!result.isEmpty());
- Map<byte[], byte[]> values = result
- .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes());
- rowCount++;
- byte[] row = result.getRow();
- FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey
- .parseRowKey(row);
- assertNotNull(flowActivityRowKey);
- assertEquals(cluster, flowActivityRowKey.getClusterId());
- assertEquals(user, flowActivityRowKey.getUserId());
- assertEquals(flow, flowActivityRowKey.getFlowName());
- long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
- assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
- assertEquals(1, values.size());
- checkFlowActivityRunId(runid, flowVersion, values);
- }
- assertEquals(1, rowCount);
- }
-
- /**
- * Writes 3 applications each with a different run id and version for the same
- * {cluster, user, flow}
- *
- * They should be getting inserted into one record in the flow activity table
- * with 3 columns, one per run id
- */
- @Test
- public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException {
- String cluster = "testManyRunsFlowActivity_cluster1";
- String user = "testManyRunsFlowActivity_c_user1";
- String flow = "flow_activity_test_flow_name";
- String flowVersion1 = "A122110F135BC4";
- long runid1 = 11111111111L;
-
- String flowVersion2 = "A12222222222C4";
- long runid2 = 2222222222222L;
-
- String flowVersion3 = "A1333333333C4";
- long runid3 = 3333333333333L;
-
- TimelineEntities te = new TimelineEntities();
- long appCreatedTime = 1425016501000L;
- TimelineEntity entityApp1 =
- TestFlowDataGenerator.getFlowApp1(appCreatedTime);
- te.addEntity(entityApp1);
-
- HBaseTimelineWriterImpl hbi = null;
- Configuration c1 = util.getConfiguration();
- try {
- hbi = new HBaseTimelineWriterImpl(c1);
- hbi.init(c1);
- String appName = "application_11888888888_1111";
- hbi.write(cluster, user, flow, flowVersion1, runid1, appName, te);
-
- // write an application with to this flow but a different runid/ version
- te = new TimelineEntities();
- te.addEntity(entityApp1);
- appName = "application_11888888888_2222";
- hbi.write(cluster, user, flow, flowVersion2, runid2, appName, te);
-
- // write an application with to this flow but a different runid/ version
- te = new TimelineEntities();
- te.addEntity(entityApp1);
- appName = "application_11888888888_3333";
- hbi.write(cluster, user, flow, flowVersion3, runid3, appName, te);
-
- hbi.flush();
- } finally {
- hbi.close();
- }
- // check flow activity
- checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1,
- runid1, flowVersion2, runid2, flowVersion3, runid3, appCreatedTime);
-
- // use the timeline reader to verify data
- HBaseTimelineReaderImpl hbr = null;
- try {
- hbr = new HBaseTimelineReaderImpl();
- hbr.init(c1);
- hbr.start();
-
- Set<TimelineEntity> entities = hbr.getEntities(
- new TimelineReaderContext(cluster, null, null, null, null,
- TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null),
- new TimelineEntityFilters(10L, null, null, null, null, null,
- null, null, null),
- new TimelineDataToRetrieve());
- assertEquals(1, entities.size());
- for (TimelineEntity e : entities) {
- FlowActivityEntity flowActivity = (FlowActivityEntity)e;
- assertEquals(cluster, flowActivity.getCluster());
- assertEquals(user, flowActivity.getUser());
- assertEquals(flow, flowActivity.getFlowName());
- long dayTs =
- TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
- assertEquals(dayTs, flowActivity.getDate().getTime());
- Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
- assertEquals(3, flowRuns.size());
- for (FlowRunEntity flowRun : flowRuns) {
- long runId = flowRun.getRunId();
- String version = flowRun.getVersion();
- if (runId == runid1) {
- assertEquals(flowVersion1, version);
- } else if (runId == runid2) {
- assertEquals(flowVersion2, version);
- } else if (runId == runid3) {
- assertEquals(flowVersion3, version);
- } else {
- fail("unknown run id: " + runId);
- }
- }
- }
- } finally {
- hbr.close();
- }
- }
-
- private void checkFlowActivityTableSeveralRuns(String cluster, String user,
- String flow, Configuration c1, String flowVersion1, long runid1,
- String flowVersion2, long runid2, String flowVersion3, long runid3,
- long appCreatedTime)
- throws IOException {
- Scan s = new Scan();
- s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
- byte[] startRow =
- FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow);
- s.setStartRow(startRow);
- String clusterStop = cluster + "1";
- byte[] stopRow =
- FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow);
- s.setStopRow(stopRow);
- Connection conn = ConnectionFactory.createConnection(c1);
- Table table1 = conn.getTable(TableName
- .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
- ResultScanner scanner = table1.getScanner(s);
- int rowCount = 0;
- for (Result result : scanner) {
- assertNotNull(result);
- assertTrue(!result.isEmpty());
- byte[] row = result.getRow();
- FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey
- .parseRowKey(row);
- assertNotNull(flowActivityRowKey);
- assertEquals(cluster, flowActivityRowKey.getClusterId());
- assertEquals(user, flowActivityRowKey.getUserId());
- assertEquals(flow, flowActivityRowKey.getFlowName());
- long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
- assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
-
- Map<byte[], byte[]> values = result
- .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes());
- rowCount++;
- assertEquals(3, values.size());
- checkFlowActivityRunId(runid1, flowVersion1, values);
- checkFlowActivityRunId(runid2, flowVersion2, values);
- checkFlowActivityRunId(runid3, flowVersion3, values);
- }
- // the flow activity table is such that it will insert
- // into current day's record
- // hence, if this test runs across the midnight boundary,
- // it may fail since it would insert into two records
- // one for each day
- assertEquals(1, rowCount);
- }
-
- private void checkFlowActivityRunId(long runid, String flowVersion,
- Map<byte[], byte[]> values) throws IOException {
- byte[] rq = ColumnHelper.getColumnQualifier(
- FlowActivityColumnPrefix.RUN_ID.getColumnPrefixBytes(),
- GenericObjectMapper.write(runid));
- for (Map.Entry<byte[], byte[]> k : values.entrySet()) {
- String actualQ = Bytes.toString(k.getKey());
- if (Bytes.toString(rq).equals(actualQ)) {
- String actualV = (String) GenericObjectMapper.read(k.getValue());
- assertEquals(flowVersion, actualV);
- }
- }
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- util.shutdownMiniCluster();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ccdec4a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
deleted file mode 100644
index 801d43c..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ /dev/null
@@ -1,851 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
-import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
-import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
-import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- * Tests the FlowRun and FlowActivity Tables
- */
-public class TestHBaseStorageFlowRun {
-
- private static HBaseTestingUtility util;
-
- private final String metric1 = "MAP_SLOT_MILLIS";
- private final String metric2 = "HDFS_BYTES_READ";
-
- @BeforeClass
- public static void setupBeforeClass() throws Exception {
- util = new HBaseTestingUtility();
- Configuration conf = util.getConfiguration();
- conf.setInt("hfile.format.version", 3);
- util.startMiniCluster();
- createSchema();
- }
-
- private static void createSchema() throws IOException {
- TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
- }
-
- @Test
- public void checkCoProcessorOff() throws IOException, InterruptedException {
- Configuration hbaseConf = util.getConfiguration();
- TableName table = TableName.valueOf(hbaseConf.get(
- FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
- Connection conn = null;
- conn = ConnectionFactory.createConnection(hbaseConf);
- Admin admin = conn.getAdmin();
- if (admin == null) {
- throw new IOException("Can't check tables since admin is null");
- }
- if (admin.tableExists(table)) {
- // check the regions.
- // check in flow run table
- util.waitUntilAllRegionsAssigned(table);
- HRegionServer server = util.getRSForFirstRegionInTable(table);
- List<HRegion> regions = server.getOnlineRegions(table);
- for (HRegion region : regions) {
- assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
- hbaseConf));
- }
- }
-
- table = TableName.valueOf(hbaseConf.get(
- FlowActivityTable.TABLE_NAME_CONF_NAME,
- FlowActivityTable.DEFAULT_TABLE_NAME));
- if (admin.tableExists(table)) {
- // check the regions.
- // check in flow activity table
- util.waitUntilAllRegionsAssigned(table);
- HRegionServer server = util.getRSForFirstRegionInTable(table);
- List<HRegion> regions = server.getOnlineRegions(table);
- for (HRegion region : regions) {
- assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
- hbaseConf));
- }
- }
-
- table = TableName.valueOf(hbaseConf.get(
- EntityTable.TABLE_NAME_CONF_NAME,
- EntityTable.DEFAULT_TABLE_NAME));
- if (admin.tableExists(table)) {
- // check the regions.
- // check in entity run table
- util.waitUntilAllRegionsAssigned(table);
- HRegionServer server = util.getRSForFirstRegionInTable(table);
- List<HRegion> regions = server.getOnlineRegions(table);
- for (HRegion region : regions) {
- assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
- hbaseConf));
- }
- }
- }
-
- /**
- * Writes 4 timeline entities belonging to one flow run through the
- * {@link HBaseTimelineWriterImpl}
- *
- * Checks the flow run table contents
- *
- * The first entity has a created event, metrics and a finish event.
- *
- * The second entity has a created event and this is the entity with smallest
- * start time. This should be the start time for the flow run.
- *
- * The third entity has a finish event and this is the entity with the max end
- * time. This should be the end time for the flow run.
- *
- * The fourth entity has a created event which has a start time that is
- * greater than min start time.
- *
- */
- @Test
- public void testWriteFlowRunMinMax() throws Exception {
-
- TimelineEntities te = new TimelineEntities();
- te.addEntity(TestFlowDataGenerator.getEntity1());
-
- HBaseTimelineWriterImpl hbi = null;
- Configuration c1 = util.getConfiguration();
- String cluster = "testWriteFlowRunMinMaxToHBase_cluster1";
- String user = "testWriteFlowRunMinMaxToHBase_user1";
- String flow = "testing_flowRun_flow_name";
- String flowVersion = "CF7022C10F1354";
- long runid = 1002345678919L;
- String appName = "application_100000000000_1111";
- long minStartTs = 1425026900000L;
- long greaterStartTs = 30000000000000L;
- long endTs = 1439750690000L;
- TimelineEntity entityMinStartTime = TestFlowDataGenerator
- .getEntityMinStartTime(minStartTs);
-
- try {
- hbi = new HBaseTimelineWriterImpl(c1);
- hbi.init(c1);
- hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-
- // write another entity with the right min start time
- te = new TimelineEntities();
- te.addEntity(entityMinStartTime);
- appName = "application_100000000000_3333";
- hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-
- // writer another entity for max end time
- TimelineEntity entityMaxEndTime = TestFlowDataGenerator
- .getEntityMaxEndTime(endTs);
- te = new TimelineEntities();
- te.addEntity(entityMaxEndTime);
- appName = "application_100000000000_4444";
- hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-
- // writer another entity with greater start time
- TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
- .getEntityGreaterStartTime(greaterStartTs);
- te = new TimelineEntities();
- te.addEntity(entityGreaterStartTime);
- appName = "application_1000000000000000_2222";
- hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-
- // flush everything to hbase
- hbi.flush();
- } finally {
- hbi.close();
- }
-
- Connection conn = ConnectionFactory.createConnection(c1);
- // check in flow run table
- Table table1 = conn.getTable(TableName
- .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
- // scan the table and see that we get back the right min and max
- // timestamps
- byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
- Get g = new Get(startRow);
- g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
- FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes());
- g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
- FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes());
- Result r1 = table1.get(g);
- assertNotNull(r1);
- assertTrue(!r1.isEmpty());
- Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO
- .getBytes());
-
- assertEquals(2, r1.size());
- long starttime = Bytes.toLong(values.get(
- FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
- assertEquals(minStartTs, starttime);
- assertEquals(endTs, Bytes.toLong(values
- .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
-
- // use the timeline reader to verify data
- HBaseTimelineReaderImpl hbr = null;
- try {
- hbr = new HBaseTimelineReaderImpl();
- hbr.init(c1);
- hbr.start();
- // get the flow run entity
- TimelineEntity entity = hbr.getEntity(
- new TimelineReaderContext(cluster, user, flow, runid, null,
- TimelineEntityType.YARN_FLOW_RUN.toString(), null),
- new TimelineDataToRetrieve());
- assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
- FlowRunEntity flowRun = (FlowRunEntity)entity;
- assertEquals(minStartTs, flowRun.getStartTime());
- assertEquals(endTs, flowRun.getMaxEndTime());
- } finally {
- hbr.close();
- }
- }
-
- /**
- * Writes two application entities of the same flow run. Each application has
- * two metrics: slot millis and hdfs bytes read. Each metric has values at two
- * timestamps.
- *
- * Checks the metric values of the flow in the flow run table. Flow metric
- * values should be the sum of individual metric values that belong to the
- * latest timestamp for that metric
- */
- @Test
- public void testWriteFlowRunMetricsOneFlow() throws Exception {
- String cluster = "testWriteFlowRunMetricsOneFlow_cluster1";
- String user = "testWriteFlowRunMetricsOneFlow_user1";
- String flow = "testing_flowRun_metrics_flow_name";
- String flowVersion = "CF7022C10F1354";
- long runid = 1002345678919L;
-
- TimelineEntities te = new TimelineEntities();
- TimelineEntity entityApp1 = TestFlowDataGenerator
- .getEntityMetricsApp1(System.currentTimeMillis());
- te.addEntity(entityApp1);
-
- HBaseTimelineWriterImpl hbi = null;
- Configuration c1 = util.getConfiguration();
- try {
- hbi = new HBaseTimelineWriterImpl(c1);
- hbi.init(c1);
- String appName = "application_11111111111111_1111";
- hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
- // write another application with same metric to this flow
- te = new TimelineEntities();
- TimelineEntity entityApp2 = TestFlowDataGenerator
- .getEntityMetricsApp2(System.currentTimeMillis());
- te.addEntity(entityApp2);
- appName = "application_11111111111111_2222";
- hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
- hbi.flush();
- } finally {
- hbi.close();
- }
-
- // check flow run
- checkFlowRunTable(cluster, user, flow, runid, c1);
-
- // use the timeline reader to verify data
- HBaseTimelineReaderImpl hbr = null;
- try {
- hbr = new HBaseTimelineReaderImpl();
- hbr.init(c1);
- hbr.start();
- TimelineEntity entity = hbr.getEntity(
- new TimelineReaderContext(cluster, user, flow, runid, null,
- TimelineEntityType.YARN_FLOW_RUN.toString(), null),
- new TimelineDataToRetrieve());
- assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
- Set<TimelineMetric> metrics = entity.getMetrics();
- assertEquals(2, metrics.size());
- for (TimelineMetric metric : metrics) {
- String id = metric.getId();
- Map<Long, Number> values = metric.getValues();
- assertEquals(1, values.size());
- Number value = null;
- for (Number n : values.values()) {
- value = n;
- }
- switch (id) {
- case metric1:
- assertEquals(141L, value);
- break;
- case metric2:
- assertEquals(57L, value);
- break;
- default:
- fail("unrecognized metric: " + id);
- }
- }
- } finally {
- hbr.close();
- }
- }
-
- private void checkFlowRunTable(String cluster, String user, String flow,
- long runid, Configuration c1) throws IOException {
- Scan s = new Scan();
- s.addFamily(FlowRunColumnFamily.INFO.getBytes());
- byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
- s.setStartRow(startRow);
- String clusterStop = cluster + "1";
- byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid);
- s.setStopRow(stopRow);
- Connection conn = ConnectionFactory.createConnection(c1);
- Table table1 = conn.getTable(TableName
- .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
- ResultScanner scanner = table1.getScanner(s);
-
- int rowCount = 0;
- for (Result result : scanner) {
- assertNotNull(result);
- assertTrue(!result.isEmpty());
- Map<byte[], byte[]> values = result.getFamilyMap(FlowRunColumnFamily.INFO
- .getBytes());
- rowCount++;
- // check metric1
- byte[] q = ColumnHelper.getColumnQualifier(
- FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1);
- assertTrue(values.containsKey(q));
- assertEquals(141L, Bytes.toLong(values.get(q)));
-
- // check metric2
- assertEquals(3, values.size());
- q = ColumnHelper.getColumnQualifier(
- FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2);
- assertTrue(values.containsKey(q));
- assertEquals(57L, Bytes.toLong(values.get(q)));
- }
- assertEquals(1, rowCount);
- }
-
- @Test
- public void testWriteFlowRunMetricsPrefix() throws Exception {
- String cluster = "testWriteFlowRunMetricsPrefix_cluster1";
- String user = "testWriteFlowRunMetricsPrefix_user1";
- String flow = "testWriteFlowRunMetricsPrefix_flow_name";
- String flowVersion = "CF7022C10F1354";
-
- TimelineEntities te = new TimelineEntities();
- TimelineEntity entityApp1 = TestFlowDataGenerator
- .getEntityMetricsApp1(System.currentTimeMillis());
- te.addEntity(entityApp1);
-
- HBaseTimelineWriterImpl hbi = null;
- Configuration c1 = util.getConfiguration();
- try {
- hbi = new HBaseTimelineWriterImpl(c1);
- hbi.init(c1);
- String appName = "application_11111111111111_1111";
- hbi.write(cluster, user, flow, flowVersion, 1002345678919L, appName, te);
- // write another application with same metric to this flow
- te = new TimelineEntities();
- TimelineEntity entityApp2 = TestFlowDataGenerator
- .getEntityMetricsApp2(System.currentTimeMillis());
- te.addEntity(entityApp2);
- appName = "application_11111111111111_2222";
- hbi.write(cluster, user, flow, flowVersion, 1002345678918L, appName, te);
- hbi.flush();
- } finally {
- hbi.close();
- }
-
- // use the timeline reader to verify data
- HBaseTimelineReaderImpl hbr = null;
- try {
- hbr = new HBaseTimelineReaderImpl();
- hbr.init(c1);
- hbr.start();
- TimelineFilterList metricsToRetrieve = new TimelineFilterList(
- Operator.OR, new TimelinePrefixFilter(TimelineCompareOp.EQUAL,
- metric1.substring(0, metric1.indexOf("_") + 1)));
- TimelineEntity entity = hbr.getEntity(
- new TimelineReaderContext(cluster, user, flow, 1002345678919L, null,
- TimelineEntityType.YARN_FLOW_RUN.toString(), null),
- new TimelineDataToRetrieve(null, metricsToRetrieve, null));
- assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
- Set<TimelineMetric> metrics = entity.getMetrics();
- assertEquals(1, metrics.size());
- for (TimelineMetric metric : metrics) {
- String id = metric.getId();
- Map<Long, Number> values = metric.getValues();
- assertEquals(1, values.size());
- Number value = null;
- for (Number n : values.values()) {
- value = n;
- }
- switch (id) {
- case metric1:
- assertEquals(40L, value);
- break;
- default:
- fail("unrecognized metric: " + id);
- }
- }
-
- Set<TimelineEntity> entities = hbr.getEntities(
- new TimelineReaderContext(cluster, user, flow, null, null,
- TimelineEntityType.YARN_FLOW_RUN.toString(), null),
- new TimelineEntityFilters(),
- new TimelineDataToRetrieve(null, metricsToRetrieve, null));
- assertEquals(2, entities.size());
- int metricCnt = 0;
- for (TimelineEntity timelineEntity : entities) {
- metricCnt += timelineEntity.getMetrics().size();
- }
- assertEquals(2, metricCnt);
- } finally {
- hbr.close();
- }
- }
-
- @Test
- public void testWriteFlowRunsMetricFields() throws Exception {
- String cluster = "testWriteFlowRunsMetricFields_cluster1";
- String user = "testWriteFlowRunsMetricFields_user1";
- String flow = "testWriteFlowRunsMetricFields_flow_name";
- String flowVersion = "CF7022C10F1354";
- long runid = 1002345678919L;
-
- TimelineEntities te = new TimelineEntities();
- TimelineEntity entityApp1 = TestFlowDataGenerator
- .getEntityMetricsApp1(System.currentTimeMillis());
- te.addEntity(entityApp1);
-
- HBaseTimelineWriterImpl hbi = null;
- Configuration c1 = util.getConfiguration();
- try {
- hbi = new HBaseTimelineWriterImpl(c1);
- hbi.init(c1);
- String appName = "application_11111111111111_1111";
- hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
- // write another application with same metric to this flow
- te = new TimelineEntities();
- TimelineEntity entityApp2 = TestFlowDataGenerator
- .getEntityMetricsApp2(System.currentTimeMillis());
- te.addEntity(entityApp2);
- appName = "application_11111111111111_2222";
- hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
- hbi.flush();
- } finally {
- hbi.close();
- }
-
- // check flow run
- checkFlowRunTable(cluster, user, flow, runid, c1);
-
- // use the timeline reader to verify data
- HBaseTimelineReaderImpl hbr = null;
- try {
- hbr = new HBaseTimelineReaderImpl();
- hbr.init(c1);
- hbr.start();
- Set<TimelineEntity> entities = hbr.getEntities(
- new TimelineReaderContext(cluster, user, flow, runid, null,
- TimelineEntityType.YARN_FLOW_RUN.toString(), null),
- new TimelineEntityFilters(),
- new TimelineDataToRetrieve());
- assertEquals(1, entities.size());
- for (TimelineEntity timelineEntity : entities) {
- assertEquals(0, timelineEntity.getMetrics().size());
- }
-
- entities = hbr.getEntities(
- new TimelineReaderContext(cluster, user, flow, runid, null,
- TimelineEntityType.YARN_FLOW_RUN.toString(), null),
- new TimelineEntityFilters(),
- new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
- assertEquals(1, entities.size());
- for (TimelineEntity timelineEntity : entities) {
- Set<TimelineMetric> timelineMetrics = timelineEntity.getMetrics();
- assertEquals(2, timelineMetrics.size());
- for (TimelineMetric metric : timelineMetrics) {
- String id = metric.getId();
- Map<Long, Number> values = metric.getValues();
- assertEquals(1, values.size());
- Number value = null;
- for (Number n : values.values()) {
- value = n;
- }
- switch (id) {
- case metric1:
- assertEquals(141L, value);
- break;
- case metric2:
- assertEquals(57L, value);
- break;
- default:
- fail("unrecognized metric: " + id);
- }
- }
- }
- } finally {
- hbr.close();
- }
- }
-
- @Test
- public void testWriteFlowRunFlush() throws Exception {
- String cluster = "atestFlushFlowRun_cluster1";
- String user = "atestFlushFlowRun__user1";
- String flow = "atestFlushFlowRun_flow_name";
- String flowVersion = "AF1021C19F1351";
- long runid = 1449526652000L;
-
- int start = 10;
- int count = 20000;
- int appIdSuffix = 1;
- HBaseTimelineWriterImpl hbi = null;
- long insertTs = 1449796654827L - count;
- long minTS = insertTs + 1;
- long startTs = insertTs;
- Configuration c1 = util.getConfiguration();
- TimelineEntities te1 = null;
- TimelineEntity entityApp1 = null;
- TimelineEntity entityApp2 = null;
- try {
- hbi = new HBaseTimelineWriterImpl(c1);
- hbi.init(c1);
-
- for (int i = start; i < count; i++) {
- String appName = "application_1060350000000_" + appIdSuffix;
- insertTs++;
- te1 = new TimelineEntities();
- entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs);
- te1.addEntity(entityApp1);
- entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs);
- te1.addEntity(entityApp2);
- hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
- Thread.sleep(1);
-
- appName = "application_1001199480000_7" + appIdSuffix;
- insertTs++;
- appIdSuffix++;
- te1 = new TimelineEntities();
- entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs);
- te1.addEntity(entityApp1);
- entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs);
- te1.addEntity(entityApp2);
-
- hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
- if (i % 1000 == 0) {
- hbi.flush();
- checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow,
- runid, false);
- }
- }
- } finally {
- hbi.flush();
- hbi.close();
- checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, runid,
- true);
- }
- }
-
- private void checkMinMaxFlush(Configuration c1, long minTS, long startTs,
- int count, String cluster, String user, String flow, long runid,
- boolean checkMax) throws IOException {
- Connection conn = ConnectionFactory.createConnection(c1);
- // check in flow run table
- Table table1 = conn.getTable(TableName
- .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
- // scan the table and see that we get back the right min and max
- // timestamps
- byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
- Get g = new Get(startRow);
- g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
- FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes());
- g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
- FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes());
-
- Result r1 = table1.get(g);
- assertNotNull(r1);
- assertTrue(!r1.isEmpty());
- Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO
- .getBytes());
- int start = 10;
- assertEquals(2, r1.size());
- long starttime = Bytes.toLong(values
- .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
- assertEquals(minTS, starttime);
- if (checkMax) {
- assertEquals(startTs + 2 * (count - start)
- + TestFlowDataGenerator.END_TS_INCR,
- Bytes.toLong(values
- .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
- }
- }
-
- @Test
- public void testFilterFlowRunsByCreatedTime() throws Exception {
- String cluster = "cluster2";
- String user = "user2";
- String flow = "flow_name2";
-
- TimelineEntities te = new TimelineEntities();
- TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(
- System.currentTimeMillis());
- entityApp1.setCreatedTime(1425016501000L);
- te.addEntity(entityApp1);
-
- HBaseTimelineWriterImpl hbi = null;
- Configuration c1 = util.getConfiguration();
- try {
- hbi = new HBaseTimelineWriterImpl(c1);
- hbi.init(c1);
- hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L,
- "application_11111111111111_1111", te);
- // write another application with same metric to this flow
- te = new TimelineEntities();
- TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(
- System.currentTimeMillis());
- entityApp2.setCreatedTime(1425016502000L);
- te.addEntity(entityApp2);
- hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L,
- "application_11111111111111_2222", te);
- hbi.flush();
- } finally {
- hbi.close();
- }
-
- // use the timeline reader to verify data
- HBaseTimelineReaderImpl hbr = null;
- try {
- hbr = new HBaseTimelineReaderImpl();
- hbr.init(c1);
- hbr.start();
-
- Set<TimelineEntity> entities = hbr.getEntities(
- new TimelineReaderContext(cluster, user, flow,
- null, null, TimelineEntityType.YARN_FLOW_RUN.toString(), null),
- new TimelineEntityFilters(null, 1425016501000L, 1425016502001L, null,
- null, null, null, null, null), new TimelineDataToRetrieve());
- assertEquals(2, entities.size());
- for (TimelineEntity entity : entities) {
- if (!entity.getId().equals("user2@flow_name2/1002345678918") &&
- !entity.getId().equals("user2@flow_name2/1002345678919")) {
- fail("Entities with flow runs 1002345678918 and 1002345678919" +
- "should be present.");
- }
- }
- entities = hbr.getEntities(
- new TimelineReaderContext(cluster, user, flow, null, null,
- TimelineEntityType.YARN_FLOW_RUN.toString(), null),
- new TimelineEntityFilters(null, 1425016501050L, null, null, null,
- null, null, null, null), new TimelineDataToRetrieve());
- assertEquals(1, entities.size());
- for (TimelineEntity entity : entities) {
- if (!entity.getId().equals("user2@flow_name2/1002345678918")) {
- fail("Entity with flow run 1002345678918 should be present.");
- }
- }
- entities = hbr.getEntities(
- new TimelineReaderContext(cluster, user, flow, null, null,
- TimelineEntityType.YARN_FLOW_RUN.toString(), null),
- new TimelineEntityFilters(null, null, 1425016501050L, null, null,
- null, null, null, null), new TimelineDataToRetrieve());
- assertEquals(1, entities.size());
- for (TimelineEntity entity : entities) {
- if (!entity.getId().equals("user2@flow_name2/1002345678919")) {
- fail("Entity with flow run 1002345678919 should be present.");
- }
- }
- } finally {
- hbr.close();
- }
- }
-
- @Test
- public void testMetricFilters() throws Exception {
- String cluster = "cluster1";
- String user = "user1";
- String flow = "flow_name1";
-
- TimelineEntities te = new TimelineEntities();
- TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(
- System.currentTimeMillis());
- te.addEntity(entityApp1);
-
- HBaseTimelineWriterImpl hbi = null;
- Configuration c1 = util.getConfiguration();
- try {
- hbi = new HBaseTimelineWriterImpl(c1);
- hbi.init(c1);
- hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L,
- "application_11111111111111_1111", te);
- // write another application with same metric to this flow
- te = new TimelineEntities();
- TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(
- System.currentTimeMillis());
- te.addEntity(entityApp2);
- hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L,
- "application_11111111111111_2222", te);
- hbi.flush();
- } finally {
- hbi.close();
- }
-
- // use the timeline reader to verify data
- HBaseTimelineReaderImpl hbr = null;
- try {
- hbr = new HBaseTimelineReaderImpl();
- hbr.init(c1);
- hbr.start();
-
- TimelineFilterList list1 = new TimelineFilterList();
- list1.addFilter(new TimelineCompareFilter(
- TimelineCompareOp.GREATER_OR_EQUAL, metric1, 101));
- TimelineFilterList list2 = new TimelineFilterList();
- list2.addFilter(new TimelineCompareFilter(
- TimelineCompareOp.LESS_THAN, metric1, 43));
- list2.addFilter(new TimelineCompareFilter(
- TimelineCompareOp.EQUAL, metric2, 57));
- TimelineFilterList metricFilterList =
- new TimelineFilterList(Operator.OR, list1, list2);
- Set<TimelineEntity> entities = hbr.getEntities(
- new TimelineReaderContext(cluster, user, flow, null,
- null, TimelineEntityType.YARN_FLOW_RUN.toString(), null),
- new TimelineEntityFilters(null, null, null, null, null, null, null,
- metricFilterList, null),
- new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
- assertEquals(2, entities.size());
- int metricCnt = 0;
- for (TimelineEntity entity : entities) {
- metricCnt += entity.getMetrics().size();
- }
- assertEquals(3, metricCnt);
-
- TimelineFilterList metricFilterList1 = new TimelineFilterList(
- new TimelineCompareFilter(
- TimelineCompareOp.LESS_OR_EQUAL, metric1, 127),
- new TimelineCompareFilter(TimelineCompareOp.NOT_EQUAL, metric2, 30));
- entities = hbr.getEntities(
- new TimelineReaderContext(cluster, user, flow, null, null,
- TimelineEntityType.YARN_FLOW_RUN.toString(), null),
- new TimelineEntityFilters(null, null, null, null, null, null, null,
- metricFilterList1, null),
- new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
- assertEquals(1, entities.size());
- metricCnt = 0;
- for (TimelineEntity entity : entities) {
- metricCnt += entity.getMetrics().size();
- }
- assertEquals(2, metricCnt);
-
- TimelineFilterList metricFilterList2 = new TimelineFilterList(
- new TimelineCompareFilter(TimelineCompareOp.LESS_THAN, metric1, 32),
- new TimelineCompareFilter(TimelineCompareOp.NOT_EQUAL, metric2, 57));
- entities = hbr.getEntities(
- new TimelineReaderContext(cluster, user, flow, null, null,
- TimelineEntityType.YARN_FLOW_RUN.toString(), null),
- new TimelineEntityFilters(null, null, null, null, null, null, null,
- metricFilterList2, null),
- new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
- assertEquals(0, entities.size());
-
- TimelineFilterList metricFilterList3 = new TimelineFilterList(
- new TimelineCompareFilter(TimelineCompareOp.EQUAL, "s_metric", 32));
- entities = hbr.getEntities(
- new TimelineReaderContext(cluster, user, flow, null, null,
- TimelineEntityType.YARN_FLOW_RUN.toString(), null),
- new TimelineEntityFilters(null, null, null, null, null, null, null,
- metricFilterList3, null),
- new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
- assertEquals(0, entities.size());
-
- TimelineFilterList list3 = new TimelineFilterList();
- list3.addFilter(new TimelineCompareFilter(
- TimelineCompareOp.GREATER_OR_EQUAL, metric1, 101));
- TimelineFilterList list4 = new TimelineFilterList();
- list4.addFilter(new TimelineCompareFilter(
- TimelineCompareOp.LESS_THAN, metric1, 43));
- list4.addFilter(new TimelineCompareFilter(
- TimelineCompareOp.EQUAL, metric2, 57));
- TimelineFilterList metricFilterList4 =
- new TimelineFilterList(Operator.OR, list3, list4);
- TimelineFilterList metricsToRetrieve = new TimelineFilterList(Operator.OR,
- new TimelinePrefixFilter(TimelineCompareOp.EQUAL,
- metric2.substring(0, metric2.indexOf("_") + 1)));
- entities = hbr.getEntities(
- new TimelineReaderContext(cluster, user, flow, null, null,
- TimelineEntityType.YARN_FLOW_RUN.toString(), null),
- new TimelineEntityFilters(null, null, null, null, null, null, null,
- metricFilterList4, null),
- new TimelineDataToRetrieve(null, metricsToRetrieve,
- EnumSet.of(Field.ALL)));
- assertEquals(2, entities.size());
- metricCnt = 0;
- for (TimelineEntity entity : entities) {
- metricCnt += entity.getMetrics().size();
- }
- assertEquals(1, metricCnt);
- } finally {
- hbr.close();
- }
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- util.shutdownMiniCluster();
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org