You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2015/09/17 19:51:38 UTC
[1/3] hadoop git commit: YARN-3901. Populate flow run data in the
flow_run & flow activity tables (Vrushali C via sjlee)
Repository: hadoop
Updated Branches:
refs/heads/YARN-2928 b1960e0d2 -> 4b37985e6
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
new file mode 100644
index 0000000..b4a0c74
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests the FlowRun and FlowActivity Tables
+ */
+public class TestHBaseStorageFlowActivity {
+
+ private static HBaseTestingUtility util;
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ util = new HBaseTestingUtility();
+ Configuration conf = util.getConfiguration();
+ conf.setInt("hfile.format.version", 3);
+ util.startMiniCluster();
+ createSchema();
+ }
+
+ private static void createSchema() throws IOException {
+ TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
+ }
+
+ /**
+ * Writes 4 timeline entities belonging to one flow run through the
+ * {@link HBaseTimelineWriterImpl}
+ *
+ * Checks the flow run table contents
+ *
+ * The first entity has a created event, metrics and a finish event.
+ *
+ * The second entity has a created event and this is the entity with smallest
+ * start time. This should be the start time for the flow run.
+ *
+ * The third entity has a finish event and this is the entity with the max end
+ * time. This should be the end time for the flow run.
+ *
+ * The fourth entity has a created event which has a start time that is
+ * greater than min start time.
+ *
+ * The test also checks in the flow activity table that one entry has been
+ * made for all of these 4 application entities since they belong to the same
+ * flow run.
+ */
+ @Test
+ public void testWriteFlowRunMinMax() throws Exception {
+
+ TimelineEntities te = new TimelineEntities();
+ te.addEntity(TestFlowDataGenerator.getEntity1());
+
+ HBaseTimelineWriterImpl hbi = null;
+ Configuration c1 = util.getConfiguration();
+ String cluster = "testWriteFlowRunMinMaxToHBase_cluster1";
+ String user = "testWriteFlowRunMinMaxToHBase_user1";
+ String flow = "testing_flowRun_flow_name";
+ String flowVersion = "CF7022C10F1354";
+ Long runid = 1002345678919L;
+ String appName = "application_100000000000_1111";
+ long endTs = 1439750690000L;
+ TimelineEntity entityMinStartTime = TestFlowDataGenerator
+ .getEntityMinStartTime();
+
+ try {
+ hbi = new HBaseTimelineWriterImpl(c1);
+ hbi.init(c1);
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+ // write another entity with the right min start time
+ te = new TimelineEntities();
+ te.addEntity(entityMinStartTime);
+ appName = "application_100000000000_3333";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+ // writer another entity for max end time
+ TimelineEntity entityMaxEndTime = TestFlowDataGenerator
+ .getEntityMaxEndTime(endTs);
+ te = new TimelineEntities();
+ te.addEntity(entityMaxEndTime);
+ appName = "application_100000000000_4444";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+ // writer another entity with greater start time
+ TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
+ .getEntityGreaterStartTime();
+ te = new TimelineEntities();
+ te.addEntity(entityGreaterStartTime);
+ appName = "application_1000000000000000_2222";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+ // flush everything to hbase
+ hbi.flush();
+ } finally {
+ hbi.close();
+ }
+
+ Connection conn = ConnectionFactory.createConnection(c1);
+ // check in flow activity table
+ Table table1 = conn.getTable(TableName
+ .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
+ byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
+ Get g = new Get(startRow);
+ Result r1 = table1.get(g);
+ assertNotNull(r1);
+ assertTrue(!r1.isEmpty());
+ Map<byte[], byte[]> values = r1.getFamilyMap(FlowActivityColumnFamily.INFO
+ .getBytes());
+ assertEquals(1, values.size());
+ byte[] row = r1.getRow();
+ FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey.parseRowKey(row);
+ assertNotNull(flowActivityRowKey);
+ assertEquals(cluster, flowActivityRowKey.getClusterId());
+ assertEquals(user, flowActivityRowKey.getUserId());
+ assertEquals(flow, flowActivityRowKey.getFlowId());
+ long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+ .currentTimeMillis());
+ assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
+ assertEquals(1, values.size());
+ checkFlowActivityRunId(runid, flowVersion, values);
+ }
+
+ /**
+ * Write 1 application entity and checks the record for today in the flow
+ * activity table
+ */
+ @Test
+ public void testWriteFlowActivityOneFlow() throws Exception {
+ String cluster = "testWriteFlowActivityOneFlow_cluster1";
+ String user = "testWriteFlowActivityOneFlow_user1";
+ String flow = "flow_activity_test_flow_name";
+ String flowVersion = "A122110F135BC4";
+ Long runid = 1001111178919L;
+
+ TimelineEntities te = new TimelineEntities();
+ TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1();
+ te.addEntity(entityApp1);
+
+ HBaseTimelineWriterImpl hbi = null;
+ Configuration c1 = util.getConfiguration();
+ try {
+ hbi = new HBaseTimelineWriterImpl(c1);
+ hbi.init(c1);
+ String appName = "application_1111999999_1234";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+ hbi.flush();
+ } finally {
+ hbi.close();
+ }
+ // check flow activity
+ checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1);
+ }
+
+ private void checkFlowActivityTable(String cluster, String user, String flow,
+ String flowVersion, Long runid, Configuration c1) throws IOException {
+ Scan s = new Scan();
+ s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
+ byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
+ s.setStartRow(startRow);
+ String clusterStop = cluster + "1";
+ byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow);
+ s.setStopRow(stopRow);
+ Connection conn = ConnectionFactory.createConnection(c1);
+ Table table1 = conn.getTable(TableName
+ .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
+ ResultScanner scanner = table1.getScanner(s);
+ int rowCount = 0;
+ for (Result result : scanner) {
+ assertNotNull(result);
+ assertTrue(!result.isEmpty());
+ Map<byte[], byte[]> values = result
+ .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes());
+ rowCount++;
+ byte[] row = result.getRow();
+ FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey
+ .parseRowKey(row);
+ assertNotNull(flowActivityRowKey);
+ assertEquals(cluster, flowActivityRowKey.getClusterId());
+ assertEquals(user, flowActivityRowKey.getUserId());
+ assertEquals(flow, flowActivityRowKey.getFlowId());
+ long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+ .currentTimeMillis());
+ assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
+ assertEquals(1, values.size());
+ checkFlowActivityRunId(runid, flowVersion, values);
+ }
+ assertEquals(1, rowCount);
+ }
+
+ /**
+ * Writes 3 applications each with a different run id and version for the same
+ * {cluster, user, flow}
+ *
+ * They should be getting inserted into one record in the flow activity table
+ * with 3 columns, one per run id
+ */
+ @Test
+ public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException {
+ String cluster = "testManyRunsFlowActivity_cluster1";
+ String user = "testManyRunsFlowActivity_c_user1";
+ String flow = "flow_activity_test_flow_name";
+ String flowVersion1 = "A122110F135BC4";
+ Long runid1 = 11111111111L;
+
+ String flowVersion2 = "A12222222222C4";
+ long runid2 = 2222222222222L;
+
+ String flowVersion3 = "A1333333333C4";
+ long runid3 = 3333333333333L;
+
+ TimelineEntities te = new TimelineEntities();
+ TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1();
+ te.addEntity(entityApp1);
+
+ HBaseTimelineWriterImpl hbi = null;
+ Configuration c1 = util.getConfiguration();
+ try {
+ hbi = new HBaseTimelineWriterImpl(c1);
+ hbi.init(c1);
+ String appName = "application_11888888888_1111";
+ hbi.write(cluster, user, flow, flowVersion1, runid1, appName, te);
+
+ // write an application with to this flow but a different runid/ version
+ te = new TimelineEntities();
+ te.addEntity(entityApp1);
+ appName = "application_11888888888_2222";
+ hbi.write(cluster, user, flow, flowVersion2, runid2, appName, te);
+
+ // write an application with to this flow but a different runid/ version
+ te = new TimelineEntities();
+ te.addEntity(entityApp1);
+ appName = "application_11888888888_3333";
+ hbi.write(cluster, user, flow, flowVersion3, runid3, appName, te);
+
+ hbi.flush();
+ } finally {
+ hbi.close();
+ }
+ // check flow activity
+ checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1,
+ runid1, flowVersion2, runid2, flowVersion3, runid3);
+
+ }
+
+ private void checkFlowActivityTableSeveralRuns(String cluster, String user,
+ String flow, Configuration c1, String flowVersion1, Long runid1,
+ String flowVersion2, Long runid2, String flowVersion3, Long runid3)
+ throws IOException {
+ Scan s = new Scan();
+ s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
+ byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
+ s.setStartRow(startRow);
+ String clusterStop = cluster + "1";
+ byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow);
+ s.setStopRow(stopRow);
+ Connection conn = ConnectionFactory.createConnection(c1);
+ Table table1 = conn.getTable(TableName
+ .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
+ ResultScanner scanner = table1.getScanner(s);
+ int rowCount = 0;
+ for (Result result : scanner) {
+ assertNotNull(result);
+ assertTrue(!result.isEmpty());
+ byte[] row = result.getRow();
+ FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey
+ .parseRowKey(row);
+ assertNotNull(flowActivityRowKey);
+ assertEquals(cluster, flowActivityRowKey.getClusterId());
+ assertEquals(user, flowActivityRowKey.getUserId());
+ assertEquals(flow, flowActivityRowKey.getFlowId());
+ long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+ .currentTimeMillis());
+ assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
+
+ Map<byte[], byte[]> values = result
+ .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes());
+ rowCount++;
+ assertEquals(3, values.size());
+ checkFlowActivityRunId(runid1, flowVersion1, values);
+ checkFlowActivityRunId(runid2, flowVersion2, values);
+ checkFlowActivityRunId(runid3, flowVersion3, values);
+ }
+ // the flow activity table is such that it will insert
+ // into current day's record
+ // hence, if this test runs across the midnight boundary,
+ // it may fail since it would insert into two records
+ // one for each day
+ assertEquals(1, rowCount);
+ }
+
+ private void checkFlowActivityRunId(Long runid, String flowVersion,
+ Map<byte[], byte[]> values) throws IOException {
+ byte[] rq = ColumnHelper.getColumnQualifier(
+ FlowActivityColumnPrefix.RUN_ID.getColumnPrefixBytes(),
+ GenericObjectMapper.write(runid));
+ for (Map.Entry<byte[], byte[]> k : values.entrySet()) {
+ String actualQ = Bytes.toString(k.getKey());
+ if (Bytes.toString(rq).equals(actualQ)) {
+ String actualV = (String) GenericObjectMapper.read(k.getValue());
+ assertEquals(flowVersion, actualV);
+ }
+ }
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ util.shutdownMiniCluster();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
new file mode 100644
index 0000000..bf524ea
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests the FlowRun and FlowActivity Tables
+ */
+public class TestHBaseStorageFlowRun {
+
+ private static HBaseTestingUtility util;
+
+ private final String metric1 = "MAP_SLOT_MILLIS";
+ private final String metric2 = "HDFS_BYTES_READ";
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ util = new HBaseTestingUtility();
+ Configuration conf = util.getConfiguration();
+ conf.setInt("hfile.format.version", 3);
+ util.startMiniCluster();
+ createSchema();
+ }
+
+ private static void createSchema() throws IOException {
+ TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
+ }
+
+ /**
+ * Writes 4 timeline entities belonging to one flow run through the
+ * {@link HBaseTimelineWriterImpl}
+ *
+ * Checks the flow run table contents
+ *
+ * The first entity has a created event, metrics and a finish event.
+ *
+ * The second entity has a created event and this is the entity with smallest
+ * start time. This should be the start time for the flow run.
+ *
+ * The third entity has a finish event and this is the entity with the max end
+ * time. This should be the end time for the flow run.
+ *
+ * The fourth entity has a created event which has a start time that is
+ * greater than min start time.
+ *
+ */
+ @Test
+ public void testWriteFlowRunMinMax() throws Exception {
+
+ TimelineEntities te = new TimelineEntities();
+ te.addEntity(TestFlowDataGenerator.getEntity1());
+
+ HBaseTimelineWriterImpl hbi = null;
+ Configuration c1 = util.getConfiguration();
+ String cluster = "testWriteFlowRunMinMaxToHBase_cluster1";
+ String user = "testWriteFlowRunMinMaxToHBase_user1";
+ String flow = "testing_flowRun_flow_name";
+ String flowVersion = "CF7022C10F1354";
+ Long runid = 1002345678919L;
+ String appName = "application_100000000000_1111";
+ long endTs = 1439750690000L;
+ TimelineEntity entityMinStartTime = TestFlowDataGenerator
+ .getEntityMinStartTime();
+
+ try {
+ hbi = new HBaseTimelineWriterImpl(c1);
+ hbi.init(c1);
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+ // write another entity with the right min start time
+ te = new TimelineEntities();
+ te.addEntity(entityMinStartTime);
+ appName = "application_100000000000_3333";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+ // writer another entity for max end time
+ TimelineEntity entityMaxEndTime = TestFlowDataGenerator
+ .getEntityMaxEndTime(endTs);
+ te = new TimelineEntities();
+ te.addEntity(entityMaxEndTime);
+ appName = "application_100000000000_4444";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+ // writer another entity with greater start time
+ TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
+ .getEntityGreaterStartTime();
+ te = new TimelineEntities();
+ te.addEntity(entityGreaterStartTime);
+ appName = "application_1000000000000000_2222";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+ // flush everything to hbase
+ hbi.flush();
+ } finally {
+ hbi.close();
+ }
+
+ Connection conn = ConnectionFactory.createConnection(c1);
+ // check in flow run table
+ Table table1 = conn.getTable(TableName
+ .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+ // scan the table and see that we get back the right min and max
+ // timestamps
+ byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+ Get g = new Get(startRow);
+ g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
+ FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes());
+ g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
+ FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes());
+ Result r1 = table1.get(g);
+ assertNotNull(r1);
+ assertTrue(!r1.isEmpty());
+ Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO
+ .getBytes());
+
+ assertEquals(2, r1.size());
+ Long starttime = (Long) GenericObjectMapper.read(values
+ .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
+ Long expmin = entityMinStartTime.getCreatedTime();
+ assertEquals(expmin, starttime);
+ assertEquals(endTs, GenericObjectMapper.read(values
+ .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
+ }
+
+ boolean isFlowRunRowKeyCorrect(byte[] rowKey, String cluster, String user,
+ String flow, Long runid) {
+ byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1);
+ assertTrue(rowKeyComponents.length == 4);
+ assertEquals(cluster, Bytes.toString(rowKeyComponents[0]));
+ assertEquals(user, Bytes.toString(rowKeyComponents[1]));
+ assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
+ assertEquals(TimelineWriterUtils.invert(runid),
+ Bytes.toLong(rowKeyComponents[3]));
+ return true;
+ }
+
+ /**
+ * Writes two application entities of the same flow run. Each application has
+ * two metrics: slot millis and hdfs bytes read. Each metric has values at two
+ * timestamps.
+ *
+ * Checks the metric values of the flow in the flow run table. Flow metric
+ * values should be the sum of individual metric values that belong to the
+ * latest timestamp for that metric
+ */
+ @Test
+ public void testWriteFlowRunMetricsOneFlow() throws Exception {
+ String cluster = "testWriteFlowRunMetricsOneFlow_cluster1";
+ String user = "testWriteFlowRunMetricsOneFlow_user1";
+ String flow = "testing_flowRun_metrics_flow_name";
+ String flowVersion = "CF7022C10F1354";
+ Long runid = 1002345678919L;
+
+ TimelineEntities te = new TimelineEntities();
+ TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
+ te.addEntity(entityApp1);
+
+ HBaseTimelineWriterImpl hbi = null;
+ Configuration c1 = util.getConfiguration();
+ try {
+ hbi = new HBaseTimelineWriterImpl(c1);
+ hbi.init(c1);
+ String appName = "application_11111111111111_1111";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+ // write another application with same metric to this flow
+ te = new TimelineEntities();
+ TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2();
+ te.addEntity(entityApp2);
+ appName = "application_11111111111111_2222";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+ hbi.flush();
+ } finally {
+ hbi.close();
+ }
+
+ // check flow run
+ checkFlowRunTable(cluster, user, flow, runid, c1);
+ }
+
+ private void checkFlowRunTable(String cluster, String user, String flow,
+ long runid, Configuration c1) throws IOException {
+ Scan s = new Scan();
+ s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+ byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+ s.setStartRow(startRow);
+ String clusterStop = cluster + "1";
+ byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid);
+ s.setStopRow(stopRow);
+ Connection conn = ConnectionFactory.createConnection(c1);
+ Table table1 = conn.getTable(TableName
+ .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+ ResultScanner scanner = table1.getScanner(s);
+
+ int rowCount = 0;
+ for (Result result : scanner) {
+ assertNotNull(result);
+ assertTrue(!result.isEmpty());
+ Map<byte[], byte[]> values = result.getFamilyMap(FlowRunColumnFamily.INFO
+ .getBytes());
+ rowCount++;
+ // check metric1
+ byte[] q = ColumnHelper.getColumnQualifier(
+ FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1);
+ assertTrue(values.containsKey(q));
+ assertEquals(141, GenericObjectMapper.read(values.get(q)));
+
+ // check metric2
+ assertEquals(2, values.size());
+ q = ColumnHelper.getColumnQualifier(
+ FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2);
+ assertTrue(values.containsKey(q));
+ assertEquals(57, GenericObjectMapper.read(values.get(q)));
+ }
+ assertEquals(1, rowCount);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ util.shutdownMiniCluster();
+ }
+}
[2/3] hadoop git commit: YARN-3901. Populate flow run data in the
flow_run & flow activity tables (Vrushali C via sjlee)
Posted by sj...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java
new file mode 100644
index 0000000..af8df99
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+
+/**
+ * The flow activity table has column family info
+ * Stores the daily activity record for flows
+ * Useful as a quick lookup of what flows were
+ * running on a given day
+ *
+ * Example flow activity table record:
+ *
+ * </pre>
+ * |-------------------------------------------|
+ * | Row key | Column Family |
+ * | | info |
+ * |-------------------------------------------|
+ * | clusterId! | r!runid1:version1 |
+ * | inv Top of | |
+ * | Day! | r!runid2:version7 |
+ * | userName! | |
+ * | flowId | |
+ * |-------------------------------------------|
+ * </pre>
+ */
+public class FlowActivityTable extends BaseTable<FlowActivityTable> {
+ /** flow activity table prefix */
+ private static final String PREFIX =
+ YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowactivity";
+
+ /** config param name that specifies the flowactivity table name */
+ public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+ /** default value for flowactivity table name */
+ public static final String DEFAULT_TABLE_NAME = "timelineservice.flowactivity";
+
+ private static final Log LOG = LogFactory.getLog(FlowActivityTable.class);
+
+ /** default max number of versions */
+ public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE;
+
+ public FlowActivityTable() {
+ super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
+ * (org.apache.hadoop.hbase.client.Admin,
+ * org.apache.hadoop.conf.Configuration)
+ */
+ public void createTable(Admin admin, Configuration hbaseConf)
+ throws IOException {
+
+ TableName table = getTableName(hbaseConf);
+ if (admin.tableExists(table)) {
+ // do not disable / delete existing table
+ // similar to the approach taken by map-reduce jobs when
+ // output directory exists
+ throw new IOException("Table " + table.getNameAsString()
+ + " already exists.");
+ }
+
+ HTableDescriptor FlowActivityTableDescp = new HTableDescriptor(table);
+ HColumnDescriptor infoCF =
+ new HColumnDescriptor(FlowActivityColumnFamily.INFO.getBytes());
+ infoCF.setBloomFilterType(BloomType.ROWCOL);
+ FlowActivityTableDescp.addFamily(infoCF);
+ infoCF.setMinVersions(1);
+ infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
+
+ // TODO: figure the split policy before running in production
+ admin.createTable(FlowActivityTableDescp);
+ LOG.info("Status of table creation for " + table.getNameAsString() + "="
+ + admin.tableExists(table));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
new file mode 100644
index 0000000..ad30add
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+
+/**
+ * Identifies fully qualified columns for the {@link FlowRunTable}.
+ */
+public enum FlowRunColumn implements Column<FlowRunTable> {
+
+ /**
+ * When the flow was started. This is the minimum of currently known
+ * application start times.
+ */
+ MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time",
+ AggregationOperation.MIN),
+
+ /**
+ * When the flow ended. This is the maximum of currently known application end
+ * times.
+ */
+ MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time",
+ AggregationOperation.MAX),
+
+ /**
+ * The version of the flow that this flow belongs to.
+ */
+ FLOW_VERSION(FlowRunColumnFamily.INFO, "flow_version", null);
+
+ private final ColumnHelper<FlowRunTable> column;
+ private final ColumnFamily<FlowRunTable> columnFamily;
+ private final String columnQualifier;
+ private final byte[] columnQualifierBytes;
+ private final AggregationOperation aggOp;
+
+ private FlowRunColumn(ColumnFamily<FlowRunTable> columnFamily,
+ String columnQualifier, AggregationOperation aggOp) {
+ this.columnFamily = columnFamily;
+ this.columnQualifier = columnQualifier;
+ this.aggOp = aggOp;
+ // Future-proof by ensuring the right column prefix hygiene.
+ this.columnQualifierBytes = Bytes.toBytes(Separator.SPACE
+ .encode(columnQualifier));
+ this.column = new ColumnHelper<FlowRunTable>(columnFamily);
+ }
+
+ /**
+ * @return the column name value
+ */
+ private String getColumnQualifier() {
+ return columnQualifier;
+ }
+
+ public byte[] getColumnQualifierBytes() {
+ return columnQualifierBytes.clone();
+ }
+
+ public AggregationOperation getAggregationOperation() {
+ return aggOp;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.Column#store
+ * (byte[], org.apache.hadoop.yarn.server.timelineservice.storage.common.
+ * TypedBufferedMutator, java.lang.Long, java.lang.Object,
+ * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
+ */
+ public void store(byte[] rowKey,
+ TypedBufferedMutator<FlowRunTable> tableMutator, Long timestamp,
+ Object inputValue, Attribute... attributes) throws IOException {
+
+ Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+ attributes, aggOp);
+ column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
+ inputValue, combinedAttributes);
+ }
+
+ public Object readResult(Result result) throws IOException {
+ return column.readResult(result, columnQualifierBytes);
+ }
+
+ /**
+ * Retrieve an {@link FlowRunColumn} given a name, or null if there is no
+ * match. The following holds true: {@code columnFor(x) == columnFor(y)} if
+ * and only if {@code x.equals(y)} or {@code (x == y == null)}
+ *
+ * @param columnQualifier
+ * Name of the column to retrieve
+ * @return the corresponding {@link FlowRunColumn} or null
+ */
+ public static final FlowRunColumn columnFor(String columnQualifier) {
+
+ // Match column based on value, assume column family matches.
+ for (FlowRunColumn ec : FlowRunColumn.values()) {
+ // Find a match based only on name.
+ if (ec.getColumnQualifier().equals(columnQualifier)) {
+ return ec;
+ }
+ }
+
+ // Default to null
+ return null;
+ }
+
+ /**
+ * Retrieve an {@link FlowRunColumn} given a name, or null if there is no
+ * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
+ * if and only if {@code a.equals(b) & x.equals(y)} or
+ * {@code (x == y == null)}
+ *
+ * @param columnFamily
+ * The columnFamily for which to retrieve the column.
+ * @param name
+ * Name of the column to retrieve
+ * @return the corresponding {@link FlowRunColumn} or null if both arguments
+ * don't match.
+ */
+ public static final FlowRunColumn columnFor(FlowRunColumnFamily columnFamily,
+ String name) {
+
+ for (FlowRunColumn ec : FlowRunColumn.values()) {
+ // Find a match based column family and on name.
+ if (ec.columnFamily.equals(columnFamily)
+ && ec.getColumnQualifier().equals(name)) {
+ return ec;
+ }
+ }
+
+ // Default to null
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java
new file mode 100644
index 0000000..8faf5f8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents the flow run table column families.
+ */
+public enum FlowRunColumnFamily implements ColumnFamily<FlowRunTable> {
+
+ /**
+ * Info column family houses known columns, specifically ones included in
+ * columnfamily filters.
+ */
+ INFO("i");
+
+ /**
+ * Byte representation of this column family.
+ */
+ private final byte[] bytes;
+
+ /**
+ * @param value
+ * create a column family with this name. Must be lower case and
+ * without spaces.
+ */
+ private FlowRunColumnFamily(String value) {
+ // column families should be lower case and not contain any spaces.
+ this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
+ }
+
+ public byte[] getBytes() {
+ return Bytes.copy(bytes);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
new file mode 100644
index 0000000..d55f510
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+
+/**
+ * Identifies partially qualified columns for the {@link FlowRunTable}.
+ */
+public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
+
+ /**
+ * To store flow run info values.
+ */
+ METRIC(FlowRunColumnFamily.INFO, "m", AggregationOperation.SUM);
+
+ private final ColumnHelper<FlowRunTable> column;
+ private final ColumnFamily<FlowRunTable> columnFamily;
+
+ /**
+ * Can be null for those cases where the provided column qualifier is the
+ * entire column name.
+ */
+ private final String columnPrefix;
+ private final byte[] columnPrefixBytes;
+
+ private final AggregationOperation aggOp;
+
+ /**
+ * Private constructor, meant to be used by the enum definition.
+ *
+ * @param columnFamily
+ * that this column is stored in.
+ * @param columnPrefix
+ * for this column.
+ */
+ private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily,
+ String columnPrefix, AggregationOperation fra) {
+ column = new ColumnHelper<FlowRunTable>(columnFamily);
+ this.columnFamily = columnFamily;
+ this.columnPrefix = columnPrefix;
+ if (columnPrefix == null) {
+ this.columnPrefixBytes = null;
+ } else {
+ // Future-proof by ensuring the right column prefix hygiene.
+ this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE
+ .encode(columnPrefix));
+ }
+ this.aggOp = fra;
+ }
+
+ /**
+ * @return the column name value
+ */
+ public String getColumnPrefix() {
+ return columnPrefix;
+ }
+
+ public byte[] getColumnPrefixBytes() {
+ return columnPrefixBytes.clone();
+ }
+
+ public AggregationOperation getAttribute() {
+ return aggOp;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #store(byte[],
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+ * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
+ */
+ public void store(byte[] rowKey,
+ TypedBufferedMutator<FlowRunTable> tableMutator, String qualifier,
+ Long timestamp, Object inputValue, Attribute... attributes)
+ throws IOException {
+
+ // Null check
+ if (qualifier == null) {
+ throw new IOException("Cannot store column with null qualifier in "
+ + tableMutator.getName().getNameAsString());
+ }
+
+ byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifier);
+ Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+ attributes, this.aggOp);
+ column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+ combinedAttributes);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #store(byte[],
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+ * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
+ */
+ public void store(byte[] rowKey,
+ TypedBufferedMutator<FlowRunTable> tableMutator, byte[] qualifier,
+ Long timestamp, Object inputValue, Attribute... attributes)
+ throws IOException {
+
+ // Null check
+ if (qualifier == null) {
+ throw new IOException("Cannot store column with null qualifier in "
+ + tableMutator.getName().getNameAsString());
+ }
+
+ byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifier);
+ Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+ attributes, this.aggOp);
+ column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+ combinedAttributes);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
+ */
+ public Object readResult(Result result, String qualifier) throws IOException {
+ byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifier);
+ return column.readResult(result, columnQualifier);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #readResults(org.apache.hadoop.hbase.client.Result)
+ */
+ public Map<String, Object> readResults(Result result) throws IOException {
+ return column.readResults(result, columnPrefixBytes);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
+ */
+ public <T> NavigableMap<String, NavigableMap<Long, T>> readResultsWithTimestamps(
+ Result result) throws IOException {
+ return column.readResultsWithTimestamps(result, columnPrefixBytes);
+ }
+
+ /**
+ * Retrieve an {@link FlowRunColumnPrefix} given a name, or null if there is
+ * no match. The following holds true: {@code columnFor(x) == columnFor(y)} if
+ * and only if {@code x.equals(y)} or {@code (x == y == null)}
+ *
+ * @param columnPrefix
+ * Name of the column to retrieve
+ * @return the corresponding {@link FlowRunColumnPrefix} or null
+ */
+ public static final FlowRunColumnPrefix columnFor(String columnPrefix) {
+
+ // Match column based on value, assume column family matches.
+ for (FlowRunColumnPrefix frcp : FlowRunColumnPrefix.values()) {
+ // Find a match based only on name.
+ if (frcp.getColumnPrefix().equals(columnPrefix)) {
+ return frcp;
+ }
+ }
+
+ // Default to null
+ return null;
+ }
+
+ /**
+ * Retrieve an {@link FlowRunColumnPrefix} given a name, or null if there is
+ * no match. The following holds true:
+ * {@code columnFor(a,x) == columnFor(b,y)} if and only if
+ * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)}
+ *
+ * @param columnFamily
+ * The columnFamily for which to retrieve the column.
+ * @param columnPrefix
+ * Name of the column to retrieve
+ * @return the corresponding {@link FlowRunColumnPrefix} or null if both
+ * arguments don't match.
+ */
+ public static final FlowRunColumnPrefix columnFor(
+ FlowRunColumnFamily columnFamily, String columnPrefix) {
+
+ // TODO: needs unit test to confirm and need to update javadoc to explain
+ // null prefix case.
+
+ for (FlowRunColumnPrefix frcp : FlowRunColumnPrefix.values()) {
+ // Find a match based column family and on name.
+ if (frcp.columnFamily.equals(columnFamily)
+ && (((columnPrefix == null) && (frcp.getColumnPrefix() == null)) || (frcp
+ .getColumnPrefix().equals(columnPrefix)))) {
+ return frcp;
+ }
+ }
+
+ // Default to null
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
new file mode 100644
index 0000000..f743e5e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
+
+public class FlowRunCoprocessor extends BaseRegionObserver {
+
+ @SuppressWarnings("unused")
+ private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class);
+
+ private HRegion region;
+ /**
+ * generate a timestamp that is unique per row in a region this is per region
+ */
+ private final TimestampGenerator timestampGenerator = new TimestampGenerator();
+
+ @Override
+ public void start(CoprocessorEnvironment e) throws IOException {
+ if (e instanceof RegionCoprocessorEnvironment) {
+ RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
+ this.region = env.getRegion();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * This method adds the tags onto the cells in the Put. It is presumed that
+ * all the cells in one Put have the same set of Tags. The existing cell
+ * timestamp is overwritten for non-metric cells and each such cell gets a new
+ * unique timestamp generated by {@link TimestampGenerator}
+ *
+ * @see
+ * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#prePut(org.apache
+ * .hadoop.hbase.coprocessor.ObserverContext,
+ * org.apache.hadoop.hbase.client.Put,
+ * org.apache.hadoop.hbase.regionserver.wal.WALEdit,
+ * org.apache.hadoop.hbase.client.Durability)
+ */
+ @Override
+ public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put,
+ WALEdit edit, Durability durability) throws IOException {
+ Map<String, byte[]> attributes = put.getAttributesMap();
+
+ // Assumption is that all the cells in a put are the same operation.
+ List<Tag> tags = new ArrayList<>();
+ if ((attributes != null) && (attributes.size() > 0)) {
+ for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) {
+ Tag t = TimelineWriterUtils.getTagFromAttribute(attribute);
+ tags.add(t);
+ }
+ byte[] tagByteArray = Tag.fromList(tags);
+ NavigableMap<byte[], List<Cell>> newFamilyMap = new TreeMap<>(
+ Bytes.BYTES_COMPARATOR);
+ for (Map.Entry<byte[], List<Cell>> entry : put.getFamilyCellMap()
+ .entrySet()) {
+ List<Cell> newCells = new ArrayList<>(entry.getValue().size());
+ for (Cell cell : entry.getValue()) {
+ // for each cell in the put add the tags
+ // Assumption is that all the cells in
+ // one put are the same operation
+ // also, get a unique cell timestamp for non-metric cells
+ // this way we don't inadvertently overwrite cell versions
+ long cellTimestamp = getCellTimestamp(cell.getTimestamp(), tags);
+ newCells.add(CellUtil.createCell(CellUtil.cloneRow(cell),
+ CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell),
+ cellTimestamp, KeyValue.Type.Put, CellUtil.cloneValue(cell),
+ tagByteArray));
+ }
+ newFamilyMap.put(entry.getKey(), newCells);
+ } // for each entry
+ // Update the family map for the Put
+ put.setFamilyCellMap(newFamilyMap);
+ }
+ }
+
+ /**
+ * Determines if the current cell's timestamp is to be used or a new unique
+ * cell timestamp is to be used. The reason this is done is to inadvertently
+ * overwrite cells when writes come in very fast. But for metric cells, the
+ * cell timestamp signifies the metric timestamp. Hence we don't want to
+ * overwrite it.
+ *
+ * @param timestamp
+ * @param tags
+ * @return cell timestamp
+ */
+ private long getCellTimestamp(long timestamp, List<Tag> tags) {
+ // if ts not set (hbase sets to HConstants.LATEST_TIMESTAMP by default)
+ // then use the generator
+ if (timestamp == HConstants.LATEST_TIMESTAMP) {
+ return timestampGenerator.getUniqueTimestamp();
+ } else {
+ return timestamp;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * Creates a {@link FlowScanner} Scan so that it can correctly process the
+ * contents of {@link FlowRunTable}.
+ *
+ * @see
+ * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preGetOp(org.apache
+ * .hadoop.hbase.coprocessor.ObserverContext,
+ * org.apache.hadoop.hbase.client.Get, java.util.List)
+ */
+ @Override
+ public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e,
+ Get get, List<Cell> results) throws IOException {
+ Scan scan = new Scan(get);
+ scan.setMaxVersions();
+ RegionScanner scanner = null;
+ try {
+ scanner = new FlowScanner(region, scan.getBatch(),
+ region.getScanner(scan));
+ scanner.next(results);
+ e.bypass();
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * Ensures that max versions are set for the Scan so that metrics can be
+ * correctly aggregated and min/max can be correctly determined.
+ *
+ * @see
+ * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preScannerOpen(org
+ * .apache.hadoop.hbase.coprocessor.ObserverContext,
+ * org.apache.hadoop.hbase.client.Scan,
+ * org.apache.hadoop.hbase.regionserver.RegionScanner)
+ */
+ @Override
+ public RegionScanner preScannerOpen(
+ ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
+ RegionScanner s) throws IOException {
+ // set max versions for scan to see all
+ // versions to aggregate for metrics
+ scan.setMaxVersions();
+ return s;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * Creates a {@link FlowScanner} Scan so that it can correctly process the
+ * contents of {@link FlowRunTable}.
+ *
+ * @see
+ * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#postScannerOpen(
+ * org.apache.hadoop.hbase.coprocessor.ObserverContext,
+ * org.apache.hadoop.hbase.client.Scan,
+ * org.apache.hadoop.hbase.regionserver.RegionScanner)
+ */
+ @Override
+ public RegionScanner postScannerOpen(
+ ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
+ RegionScanner scanner) throws IOException {
+ return new FlowScanner(region, scan.getBatch(), scanner);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
new file mode 100644
index 0000000..e133241
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+
+/**
+ * Represents a rowkey for the flow run table.
+ */
+public class FlowRunRowKey {
+ // TODO: more methods are needed for this class like parse row key
+
+ /**
+ * Constructs a row key for the entity table as follows: {
+ * clusterId!userI!flowId!Inverted Flow Run Id}
+ *
+ * @param clusterId
+ * @param userId
+ * @param flowId
+ * @param flowRunId
+ * @return byte array with the row key
+ */
+ public static byte[] getRowKey(String clusterId, String userId,
+ String flowId, Long flowRunId) {
+ byte[] first = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId,
+ userId, flowId));
+ // Note that flowRunId is a long, so we can't encode them all at the same
+ // time.
+ byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
+ return Separator.QUALIFIERS.join(first, second);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
new file mode 100644
index 0000000..b1b93c1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+
+/**
+ * The flow run table has column family info
+ * Stores per flow run information
+ * aggregated across applications.
+ *
+ * Metrics are also stored in the info column family.
+ *
+ * Example flow run table record:
+ *
+ * <pre>
+ * flow_run table
+ * |-------------------------------------------|
+ * | Row key | Column Family |
+ * | | info |
+ * |-------------------------------------------|
+ * | clusterId! | flow_version:version7 |
+ * | userName! | |
+ * | flowId! | running_apps:1 |
+ * | flowRunId | |
+ * | | min_start_time:1392995080000 |
+ * | | #0:"" |
+ * | | |
+ * | | min_start_time:1392995081012 |
+ * | | #0:appId2 |
+ * | | |
+ * | | min_start_time:1392993083210 |
+ * | | #0:appId3 |
+ * | | |
+ * | | |
+ * | | max_end_time:1392993084018 |
+ * | | #0:"" |
+ * | | |
+ * | | |
+ * | | m!mapInputRecords:127 |
+ * | | #0:"" |
+ * | | |
+ * | | m!mapInputRecords:31 |
+ * | | #2:appId2 |
+ * | | |
+ * | | m!mapInputRecords:37 |
+ * | | #1:appId3 |
+ * | | |
+ * | | |
+ * | | m!mapOutputRecords:181 |
+ * | | #0:"" |
+ * | | |
+ * | | m!mapOutputRecords:37 |
+ * | | #1:appId3 |
+ * | | |
+ * | | |
+ * |-------------------------------------------|
+ * </pre>
+ */
+public class FlowRunTable extends BaseTable<FlowRunTable> {
+ /** entity prefix */
+ private static final String PREFIX =
+ YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowrun";
+
+ /** config param name that specifies the flowrun table name */
+ public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+ /** default value for flowrun table name */
+ public static final String DEFAULT_TABLE_NAME = "timelineservice.flowrun";
+
+ private static final Log LOG = LogFactory.getLog(FlowRunTable.class);
+
+ /** default max number of versions */
+ public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE;
+
+ public FlowRunTable() {
+ super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
+ * (org.apache.hadoop.hbase.client.Admin,
+ * org.apache.hadoop.conf.Configuration)
+ */
+ public void createTable(Admin admin, Configuration hbaseConf)
+ throws IOException {
+
+ TableName table = getTableName(hbaseConf);
+ if (admin.tableExists(table)) {
+ // do not disable / delete existing table
+ // similar to the approach taken by map-reduce jobs when
+ // output directory exists
+ throw new IOException("Table " + table.getNameAsString()
+ + " already exists.");
+ }
+
+ HTableDescriptor flowRunTableDescp = new HTableDescriptor(table);
+ HColumnDescriptor infoCF =
+ new HColumnDescriptor(FlowRunColumnFamily.INFO.getBytes());
+ infoCF.setBloomFilterType(BloomType.ROWCOL);
+ flowRunTableDescp.addFamily(infoCF);
+ infoCF.setMinVersions(1);
+ infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
+
+ // TODO: figure the split policy
+ flowRunTableDescp.addCoprocessor(FlowRunCoprocessor.class
+ .getCanonicalName());
+ admin.createTable(flowRunTableDescp);
+ LOG.info("Status of table creation for " + table.getNameAsString() + "="
+ + admin.tableExists(table));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
new file mode 100644
index 0000000..a1948aa
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Invoked via the coprocessor when a Get or a Scan is issued for flow run
+ * table. Looks through the list of cells per row, checks their tags and does
+ * operation on those cells as per the cell tags. Transforms reads of the stored
+ * metrics into calculated sums for each column Also, finds the min and max for
+ * start and end times in a flow run.
+ */
+class FlowScanner implements RegionScanner, Closeable {
+
+ private static final Log LOG = LogFactory.getLog(FlowScanner.class);
+
+ private final HRegion region;
+ private final InternalScanner flowRunScanner;
+ private RegionScanner regionScanner;
+ private final int limit;
+ private boolean hasMore;
+ private byte[] currentRow;
+ private List<Cell> availableCells = new ArrayList<>();
+ private int currentIndex;
+
+ FlowScanner(HRegion region, int limit, InternalScanner internalScanner) {
+ this.region = region;
+ this.limit = limit;
+ this.flowRunScanner = internalScanner;
+ if (internalScanner instanceof RegionScanner) {
+ this.regionScanner = (RegionScanner) internalScanner;
+ }
+ // TODO: note if it's compaction/flush
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo()
+ */
+ @Override
+ public HRegionInfo getRegionInfo() {
+ return region.getRegionInfo();
+ }
+
+ @Override
+ public boolean nextRaw(List<Cell> cells) throws IOException {
+ return nextRaw(cells, limit);
+ }
+
+ @Override
+ public boolean nextRaw(List<Cell> cells, int limit) throws IOException {
+ return nextInternal(cells, limit);
+ }
+
+ @Override
+ public boolean next(List<Cell> cells) throws IOException {
+ return next(cells, limit);
+ }
+
+ @Override
+ public boolean next(List<Cell> cells, int limit) throws IOException {
+ return nextInternal(cells, limit);
+ }
+
+ private String getAggregationCompactionDimension(List<Tag> tags) {
+ String appId = null;
+ for (Tag t : tags) {
+ if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
+ .getType()) {
+ appId = Bytes.toString(t.getValue());
+ }
+ }
+ return appId;
+ }
+
+ /**
+ * This method loops through the cells in a given row of the
+ * {@link FlowRunTable}. It looks at the tags of each cell to figure out how
+ * to process the contents. It then calculates the sum or min or max for each
+ * column or returns the cell as is.
+ *
+ * @param cells
+ * @param limit
+ * @return true if next row is available for the scanner, false otherwise
+ * @throws IOException
+ */
+ private boolean nextInternal(List<Cell> cells, int limit) throws IOException {
+ Cell cell = null;
+ startNext();
+ // Loop through all the cells in this row
+ // For min/max/metrics we do need to scan the entire set of cells to get the
+ // right one
+ // But with flush/compaction, the number of cells being scanned will go down
+ // cells are grouped per column qualifier then sorted by cell timestamp
+ // (latest to oldest) per column qualifier
+ // So all cells in one qualifier come one after the other before we see the
+ // next column qualifier
+ ByteArrayComparator comp = new ByteArrayComparator();
+ byte[] currentColumnQualifier = TimelineWriterUtils.EMPTY_BYTES;
+ AggregationOperation currentAggOp = null;
+ SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
+ Set<String> alreadySeenAggDim = new HashSet<>();
+ int addedCnt = 0;
+ while (((cell = peekAtNextCell(limit)) != null)
+ && (limit <= 0 || addedCnt < limit)) {
+ byte[] newColumnQualifier = CellUtil.cloneQualifier(cell);
+ if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) {
+ addedCnt += emitCells(cells, currentColumnCells, currentAggOp);
+ resetState(currentColumnCells, alreadySeenAggDim);
+ currentColumnQualifier = newColumnQualifier;
+ currentAggOp = getCurrentAggOp(cell);
+ }
+ collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim);
+ nextCell(limit);
+ }
+ if (!currentColumnCells.isEmpty()) {
+ emitCells(cells, currentColumnCells, currentAggOp);
+ }
+ return hasMore();
+ }
+
+ private AggregationOperation getCurrentAggOp(Cell cell) {
+ List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+ cell.getTagsLength());
+ // We assume that all the operations for a particular column are the same
+ return TimelineWriterUtils.getAggregationOperationFromTagsList(tags);
+ }
+
+ /**
+ * resets the parameters to an intialized state for next loop iteration
+ *
+ * @param cell
+ * @param currentAggOp
+ * @param currentColumnCells
+ * @param alreadySeenAggDim
+ * @param collectedButNotEmitted
+ */
+ private void resetState(SortedSet<Cell> currentColumnCells,
+ Set<String> alreadySeenAggDim) {
+ currentColumnCells.clear();
+ alreadySeenAggDim.clear();
+ }
+
+ private void collectCells(SortedSet<Cell> currentColumnCells,
+ AggregationOperation currentAggOp, Cell cell,
+ Set<String> alreadySeenAggDim) throws IOException {
+ if (currentAggOp == null) {
+ // not a min/max/metric cell, so just return it as is
+ currentColumnCells.add(cell);
+ nextCell(limit);
+ return;
+ }
+
+ switch (currentAggOp) {
+ case MIN:
+ if (currentColumnCells.size() == 0) {
+ currentColumnCells.add(cell);
+ } else {
+ Cell currentMinCell = currentColumnCells.first();
+ Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp);
+ if (!currentMinCell.equals(newMinCell)) {
+ currentColumnCells.remove(currentMinCell);
+ currentColumnCells.add(newMinCell);
+ }
+ }
+ break;
+ case MAX:
+ if (currentColumnCells.size() == 0) {
+ currentColumnCells.add(cell);
+ } else {
+ Cell currentMaxCell = currentColumnCells.first();
+ Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp);
+ if (!currentMaxCell.equals(newMaxCell)) {
+ currentColumnCells.remove(currentMaxCell);
+ currentColumnCells.add(newMaxCell);
+ }
+ }
+ break;
+ case SUM:
+ case SUM_FINAL:
+ // only if this app has not been seen yet, add to current column cells
+ List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+ cell.getTagsLength());
+ String aggDim = getAggregationCompactionDimension(tags);
+ if (alreadySeenAggDim.contains(aggDim)) {
+ // if this agg dimension has already been seen,
+ // since they show up in sorted order
+ // we drop the rest which are older
+ // in other words, this cell is older than previously seen cells
+ // for that agg dim
+ } else {
+ // not seen this agg dim, hence consider this cell in our working set
+ currentColumnCells.add(cell);
+ alreadySeenAggDim.add(aggDim);
+ }
+ break;
+ default:
+ break;
+ } // end of switch case
+ }
+
+ /*
+ * Processes the cells in input param currentColumnCells and populates
+ * List<Cell> cells as the output based on the input AggregationOperation
+ * parameter.
+ */
+ private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells,
+ AggregationOperation currentAggOp) throws IOException {
+ if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
+ return 0;
+ }
+ if (currentAggOp == null) {
+ cells.addAll(currentColumnCells);
+ return currentColumnCells.size();
+ }
+
+ switch (currentAggOp) {
+ case MIN:
+ case MAX:
+ cells.addAll(currentColumnCells);
+ return currentColumnCells.size();
+ case SUM:
+ case SUM_FINAL:
+ Cell sumCell = processSummation(currentColumnCells);
+ cells.add(sumCell);
+ return 1;
+ default:
+ cells.addAll(currentColumnCells);
+ return currentColumnCells.size();
+ }
+ }
+
+ /*
+ * Returns a cell whose value is the sum of all cell values in the input set.
+ * The new cell created has the timestamp of the most recent metric cell. The
+ * sum of a metric for a flow run is the summation at the point of the last
+ * metric update in that flow till that time.
+ */
+ private Cell processSummation(SortedSet<Cell> currentColumnCells)
+ throws IOException {
+ Number sum = 0;
+ Number currentValue = 0;
+ long ts = 0L;
+ long mostCurrentTimestamp = 0l;
+ Cell mostRecentCell = null;
+ for (Cell cell : currentColumnCells) {
+ currentValue = (Number) GenericObjectMapper.read(CellUtil
+ .cloneValue(cell));
+ ts = cell.getTimestamp();
+ if (mostCurrentTimestamp < ts) {
+ mostCurrentTimestamp = ts;
+ mostRecentCell = cell;
+ }
+ sum = sum.longValue() + currentValue.longValue();
+ }
+ Cell sumCell = createNewCell(mostRecentCell, sum);
+ return sumCell;
+ }
+
+ /**
+ * Determines which cell is to be returned based on the values in each cell
+ * and the comparison operation MIN or MAX.
+ *
+ * @param previouslyChosenCell
+ * @param currentCell
+ * @param currentAggOp
+ * @return the cell which is the min (or max) cell
+ * @throws IOException
+ */
+ private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell,
+ AggregationOperation currentAggOp) throws IOException {
+ if (previouslyChosenCell == null) {
+ return currentCell;
+ }
+ try {
+ long previouslyChosenCellValue = ((Number) GenericObjectMapper
+ .read(CellUtil.cloneValue(previouslyChosenCell))).longValue();
+ long currentCellValue = ((Number) GenericObjectMapper.read(CellUtil
+ .cloneValue(currentCell))).longValue();
+ switch (currentAggOp) {
+ case MIN:
+ if (currentCellValue < previouslyChosenCellValue) {
+ // new value is minimum, hence return this cell
+ return currentCell;
+ } else {
+ // previously chosen value is miniumum, hence return previous min cell
+ return previouslyChosenCell;
+ }
+ case MAX:
+ if (currentCellValue > previouslyChosenCellValue) {
+ // new value is max, hence return this cell
+ return currentCell;
+ } else {
+ // previously chosen value is max, hence return previous max cell
+ return previouslyChosenCell;
+ }
+ default:
+ return currentCell;
+ }
+ } catch (IllegalArgumentException iae) {
+ LOG.error("caught iae during conversion to long ", iae);
+ return currentCell;
+ }
+ }
+
+ private Cell createNewCell(Cell origCell, Number number) throws IOException {
+ byte[] newValue = GenericObjectMapper.write(number);
+ return CellUtil.createCell(CellUtil.cloneRow(origCell),
+ CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
+ origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
+ }
+
+ @Override
+ public void close() throws IOException {
+ flowRunScanner.close();
+ }
+
+ /**
+ * Called to signal the start of the next() call by the scanner.
+ */
+ public void startNext() {
+ currentRow = null;
+ }
+
+ /**
+ * Returns whether or not the underlying scanner has more rows.
+ */
+ public boolean hasMore() {
+ return currentIndex < availableCells.size() ? true : hasMore;
+ }
+
+ /**
+ * Returns the next available cell for the current row and advances the
+ * pointer to the next cell. This method can be called multiple times in a row
+ * to advance through all the available cells.
+ *
+ * @param limit
+ * the limit of number of cells to return if the next batch must be
+ * fetched by the wrapped scanner
+ * @return the next available cell or null if no more cells are available for
+ * the current row
+ * @throws IOException
+ */
+ public Cell nextCell(int limit) throws IOException {
+ Cell cell = peekAtNextCell(limit);
+ if (cell != null) {
+ currentIndex++;
+ }
+ return cell;
+ }
+
+ /**
+ * Returns the next available cell for the current row, without advancing the
+ * pointer. Calling this method multiple times in a row will continue to
+ * return the same cell.
+ *
+ * @param limit
+ * the limit of number of cells to return if the next batch must be
+ * fetched by the wrapped scanner
+ * @return the next available cell or null if no more cells are available for
+ * the current row
+ * @throws IOException
+ */
+ public Cell peekAtNextCell(int limit) throws IOException {
+ if (currentIndex >= availableCells.size()) {
+ // done with current batch
+ availableCells.clear();
+ currentIndex = 0;
+ hasMore = flowRunScanner.next(availableCells, limit);
+ }
+ Cell cell = null;
+ if (currentIndex < availableCells.size()) {
+ cell = availableCells.get(currentIndex);
+ if (currentRow == null) {
+ currentRow = CellUtil.cloneRow(cell);
+ } else if (!CellUtil.matchingRow(cell, currentRow)) {
+ // moved on to the next row
+ // don't use the current cell
+ // also signal no more cells for this row
+ return null;
+ }
+ }
+ return cell;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize()
+ */
+ @Override
+ public long getMaxResultSize() {
+ if (regionScanner == null) {
+ throw new IllegalStateException(
+ "RegionScanner.isFilterDone() called when the flow "
+ + "scanner's scanner is not a RegionScanner");
+ }
+ return regionScanner.getMaxResultSize();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint()
+ */
+ @Override
+ public long getMvccReadPoint() {
+ if (regionScanner == null) {
+ throw new IllegalStateException(
+ "RegionScanner.isFilterDone() called when the flow "
+ + "scanner's internal scanner is not a RegionScanner");
+ }
+ return regionScanner.getMvccReadPoint();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone()
+ */
+ @Override
+ public boolean isFilterDone() throws IOException {
+ if (regionScanner == null) {
+ throw new IllegalStateException(
+ "RegionScanner.isFilterDone() called when the flow "
+ + "scanner's internal scanner is not a RegionScanner");
+ }
+ return regionScanner.isFilterDone();
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[])
+ */
+ @Override
+ public boolean reseek(byte[] bytes) throws IOException {
+ if (regionScanner == null) {
+ throw new IllegalStateException(
+ "RegionScanner.reseek() called when the flow "
+ + "scanner's internal scanner is not a RegionScanner");
+ }
+ return regionScanner.reseek(bytes);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
index 2875e01..3962341 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
@@ -88,20 +87,15 @@ public class TestHBaseTimelineStorage {
}
private static void createSchema() throws IOException {
- new EntityTable()
- .createTable(util.getHBaseAdmin(), util.getConfiguration());
- new AppToFlowTable()
- .createTable(util.getHBaseAdmin(), util.getConfiguration());
- new ApplicationTable()
- .createTable(util.getHBaseAdmin(), util.getConfiguration());
+ TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
}
@Test
public void testWriteApplicationToHBase() throws Exception {
TimelineEntities te = new TimelineEntities();
ApplicationEntity entity = new ApplicationEntity();
- String id = "hello";
- entity.setId(id);
+ String appId = "application_1000178881110_2002";
+ entity.setId(appId);
long cTime = 1425016501000L;
long mTime = 1425026901000L;
entity.setCreatedTime(cTime);
@@ -173,12 +167,12 @@ public class TestHBaseTimelineStorage {
String flow = "some_flow_name";
String flowVersion = "AB7822C10F1111";
long runid = 1002345678919L;
- hbi.write(cluster, user, flow, flowVersion, runid, id, te);
+ hbi.write(cluster, user, flow, flowVersion, runid, appId, te);
hbi.stop();
// retrieve the row
byte[] rowKey =
- ApplicationRowKey.getRowKey(cluster, user, flow, runid, id);
+ ApplicationRowKey.getRowKey(cluster, user, flow, runid, appId);
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
Connection conn = ConnectionFactory.createConnection(c1);
@@ -190,11 +184,11 @@ public class TestHBaseTimelineStorage {
// check the row key
byte[] row1 = result.getRow();
assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid,
- id));
+ appId));
// check info column family
String id1 = ApplicationColumn.ID.readResult(result).toString();
- assertEquals(id, id1);
+ assertEquals(appId, id1);
Number val =
(Number) ApplicationColumn.CREATED_TIME.readResult(result);
@@ -252,17 +246,17 @@ public class TestHBaseTimelineStorage {
assertEquals(metricValues, metricMap);
// read the timeline entity using the reader this time
- TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, id,
+ TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appId,
entity.getType(), entity.getId(),
EnumSet.of(TimelineReader.Field.ALL));
Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
- id, entity.getType(), null, null, null, null, null, null, null,
+ appId, entity.getType(), null, null, null, null, null, null, null,
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
assertNotNull(e1);
assertEquals(1, es1.size());
// verify attributes
- assertEquals(id, e1.getId());
+ assertEquals(appId, e1.getId());
assertEquals(TimelineEntityType.YARN_APPLICATION.toString(),
e1.getType());
assertEquals(cTime, e1.getCreatedTime());
@@ -576,7 +570,7 @@ public class TestHBaseTimelineStorage {
String flow = "other_flow_name";
String flowVersion = "1111F01C2287BA";
long runid = 1009876543218L;
- String appName = "some app name";
+ String appName = "application_123465899910_1001";
hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
hbi.stop();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
new file mode 100644
index 0000000..f8331fa
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+
+/**
+ * Generates the data/entities for the FlowRun and FlowActivity Tables
+ */
+class TestFlowDataGenerator {
+
+ private final static String metric1 = "MAP_SLOT_MILLIS";
+ private final static String metric2 = "HDFS_BYTES_READ";
+
+
+ static TimelineEntity getEntityMetricsApp1() {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowRunMetrics_test";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ Long cTime = 1425016501000L;
+ entity.setCreatedTime(cTime);
+
+ // add metrics
+ Set<TimelineMetric> metrics = new HashSet<>();
+ TimelineMetric m1 = new TimelineMetric();
+ m1.setId(metric1);
+ Map<Long, Number> metricValues = new HashMap<Long, Number>();
+ long ts = System.currentTimeMillis();
+ metricValues.put(ts - 100000, 2);
+ metricValues.put(ts - 80000, 40);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+
+ TimelineMetric m2 = new TimelineMetric();
+ m2.setId(metric2);
+ metricValues = new HashMap<Long, Number>();
+ ts = System.currentTimeMillis();
+ metricValues.put(ts - 100000, 31);
+ metricValues.put(ts - 80000, 57);
+ m2.setType(Type.TIME_SERIES);
+ m2.setValues(metricValues);
+ metrics.add(m2);
+
+ entity.addMetrics(metrics);
+ return entity;
+ }
+
+ static TimelineEntity getEntityMetricsApp2() {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowRunMetrics_test";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ Long cTime = 1425016501000L;
+ entity.setCreatedTime(cTime);
+ // add metrics
+ Set<TimelineMetric> metrics = new HashSet<>();
+ TimelineMetric m1 = new TimelineMetric();
+ m1.setId(metric1);
+ Map<Long, Number> metricValues = new HashMap<Long, Number>();
+ long ts = System.currentTimeMillis();
+ metricValues.put(ts - 100000, 5L);
+ metricValues.put(ts - 80000, 101L);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+ entity.addMetrics(metrics);
+ return entity;
+ }
+
+ static TimelineEntity getEntity1() {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowRunHello";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ Long cTime = 20000000000000L;
+ Long mTime = 1425026901000L;
+ entity.setCreatedTime(cTime);
+ entity.setModifiedTime(mTime);
+ // add metrics
+ Set<TimelineMetric> metrics = new HashSet<>();
+ TimelineMetric m1 = new TimelineMetric();
+ m1.setId(metric1);
+ Map<Long, Number> metricValues = new HashMap<Long, Number>();
+ long ts = System.currentTimeMillis();
+ metricValues.put(ts - 120000, 100000000);
+ metricValues.put(ts - 100000, 200000000);
+ metricValues.put(ts - 80000, 300000000);
+ metricValues.put(ts - 60000, 400000000);
+ metricValues.put(ts - 40000, 50000000000L);
+ metricValues.put(ts - 20000, 60000000000L);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+ entity.addMetrics(metrics);
+
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ Long expTs = 1436512802000L;
+ event.setTimestamp(expTs);
+ String expKey = "foo_event";
+ Object expVal = "test";
+ event.addInfo(expKey, expVal);
+ entity.addEvent(event);
+
+ event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+ event.setTimestamp(1436512801000L);
+ event.addInfo(expKey, expVal);
+ entity.addEvent(event);
+
+ return entity;
+ }
+
+ static TimelineEntity getEntityGreaterStartTime() {
+ TimelineEntity entity = new TimelineEntity();
+ entity.setCreatedTime(30000000000000L);
+ entity.setId("flowRunHello with greater start time");
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setType(type);
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ long endTs = 1439379885000L;
+ event.setTimestamp(endTs);
+ String expKey = "foo_event_greater";
+ String expVal = "test_app_greater";
+ event.addInfo(expKey, expVal);
+ entity.addEvent(event);
+ return entity;
+ }
+
+ static TimelineEntity getEntityMaxEndTime(long endTs) {
+ TimelineEntity entity = new TimelineEntity();
+ entity.setId("flowRunHello Max End time");
+ entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+ event.setTimestamp(endTs);
+ String expKey = "foo_even_max_ finished";
+ String expVal = "test_app_max_finished";
+ event.addInfo(expKey, expVal);
+ entity.addEvent(event);
+ return entity;
+ }
+
+ static TimelineEntity getEntityMinStartTime() {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowRunHelloMInStartTime";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ Long cTime = 10000000000000L;
+ entity.setCreatedTime(cTime);
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ event.setTimestamp(System.currentTimeMillis());
+ entity.addEvent(event);
+ return entity;
+ }
+
+
+ static TimelineEntity getFlowApp1() {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowActivity_test";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ Long cTime = 1425016501000L;
+ entity.setCreatedTime(cTime);
+
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ Long expTs = 1436512802000L;
+ event.setTimestamp(expTs);
+ String expKey = "foo_event";
+ Object expVal = "test";
+ event.addInfo(expKey, expVal);
+ entity.addEvent(event);
+
+ return entity;
+ }
+
+}
[3/3] hadoop git commit: YARN-3901. Populate flow run data in the
flow_run & flow activity tables (Vrushali C via sjlee)
Posted by sj...@apache.org.
YARN-3901. Populate flow run data in the flow_run & flow activity tables (Vrushali C via sjlee)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4b37985e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4b37985e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4b37985e
Branch: refs/heads/YARN-2928
Commit: 4b37985e6a238c7f84eecc4e7eadd30dffc90b88
Parents: b1960e0
Author: Sangjin Lee <sj...@apache.org>
Authored: Thu Sep 17 10:34:52 2015 -0700
Committer: Sangjin Lee <sj...@apache.org>
Committed: Thu Sep 17 10:34:52 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../hadoop-yarn-server-timelineservice/pom.xml | 13 +
.../storage/HBaseTimelineWriterImpl.java | 179 ++++++-
.../storage/TimelineSchemaCreator.java | 22 +-
.../storage/application/ApplicationColumn.java | 5 +-
.../application/ApplicationColumnPrefix.java | 15 +-
.../storage/apptoflow/AppToFlowColumn.java | 6 +-
.../timelineservice/storage/common/Column.java | 6 +-
.../storage/common/ColumnHelper.java | 93 +++-
.../storage/common/ColumnPrefix.java | 28 +-
.../storage/common/TimelineWriterUtils.java | 185 +++++++
.../storage/common/TimestampGenerator.java | 112 +++++
.../storage/common/package-info.java | 24 -
.../storage/entity/EntityColumn.java | 6 +-
.../storage/entity/EntityColumnPrefix.java | 20 +-
.../flow/AggregationCompactionDimension.java | 63 +++
.../storage/flow/AggregationOperation.java | 87 ++++
.../timelineservice/storage/flow/Attribute.java | 39 ++
.../storage/flow/FlowActivityColumnFamily.java | 54 +++
.../storage/flow/FlowActivityColumnPrefix.java | 243 ++++++++++
.../storage/flow/FlowActivityRowKey.java | 113 +++++
.../storage/flow/FlowActivityTable.java | 107 ++++
.../storage/flow/FlowRunColumn.java | 161 ++++++
.../storage/flow/FlowRunColumnFamily.java | 54 +++
.../storage/flow/FlowRunColumnPrefix.java | 239 +++++++++
.../storage/flow/FlowRunCoprocessor.java | 210 ++++++++
.../storage/flow/FlowRunRowKey.java | 50 ++
.../storage/flow/FlowRunTable.java | 141 ++++++
.../storage/flow/FlowScanner.java | 486 +++++++++++++++++++
.../storage/TestHBaseTimelineStorage.java | 28 +-
.../storage/flow/TestFlowDataGenerator.java | 213 ++++++++
.../flow/TestHBaseStorageFlowActivity.java | 372 ++++++++++++++
.../storage/flow/TestHBaseStorageFlowRun.java | 290 +++++++++++
33 files changed, 3562 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 0d43135..cc0f014 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -103,6 +103,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-4102. Add a "skip existing table" mode for timeline schema creator (Li
Lu via sjlee)
+ YARN-3901. Populate flow run data in the flow_run & flow activity tables
+ (Vrushali C via sjlee)
+
IMPROVEMENTS
YARN-3276. Code cleanup for timeline service API records. (Junping Du via
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
index da7fadf..758feb5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
@@ -174,6 +174,19 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <configuration>
+ <additionnalDependencies>
+ <additionnalDependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ </additionnalDependency>
+ </additionnalDependencies>
+ </configuration>
+ </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index 772002d..7c4a5da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -33,11 +33,10 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
@@ -53,23 +52,36 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
/**
- * This implements a hbase based backend for storing application timeline entity
+ * This implements a hbase based backend for storing the timeline entity
* information.
+ * It writes to multiple tables at the backend
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class HBaseTimelineWriterImpl extends AbstractService implements
TimelineWriter {
+ private static final Log LOG = LogFactory
+ .getLog(HBaseTimelineWriterImpl.class);
+
private Connection conn;
private TypedBufferedMutator<EntityTable> entityTable;
private TypedBufferedMutator<AppToFlowTable> appToFlowTable;
private TypedBufferedMutator<ApplicationTable> applicationTable;
-
- private static final Log LOG = LogFactory
- .getLog(HBaseTimelineWriterImpl.class);
+ private TypedBufferedMutator<FlowActivityTable> flowActivityTable;
+ private TypedBufferedMutator<FlowRunTable> flowRunTable;
public HBaseTimelineWriterImpl() {
super(HBaseTimelineWriterImpl.class.getName());
@@ -91,6 +103,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
entityTable = new EntityTable().getTableMutator(hbaseConf, conn);
appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn);
applicationTable = new ApplicationTable().getTableMutator(hbaseConf, conn);
+ flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn);
+ flowActivityTable = new FlowActivityTable().getTableMutator(hbaseConf, conn);
}
/**
@@ -111,7 +125,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
// if the entity is the application, the destination is the application
// table
- boolean isApplication = isApplicationEntity(te);
+ boolean isApplication = TimelineWriterUtils.isApplicationEntity(te);
byte[] rowKey = isApplication ?
ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId,
appId) :
@@ -124,37 +138,139 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
storeMetrics(rowKey, te.getMetrics(), isApplication);
storeRelations(rowKey, te, isApplication);
- if (isApplicationCreated(te)) {
- onApplicationCreated(
- clusterId, userId, flowName, flowVersion, flowRunId, appId, te);
+ if (isApplication) {
+ if (TimelineWriterUtils.isApplicationCreated(te)) {
+ onApplicationCreated(clusterId, userId, flowName, flowVersion,
+ flowRunId, appId, te);
+ }
+ // if it's an application entity, store metrics
+ storeFlowMetricsAppRunning(clusterId, userId, flowName, flowRunId,
+ appId, te);
+ // if application has finished, store it's finish time and write final
+ // values
+ // of all metrics
+ if (TimelineWriterUtils.isApplicationFinished(te)) {
+ onApplicationFinished(clusterId, userId, flowName, flowVersion,
+ flowRunId, appId, te);
+ }
}
}
return putStatus;
}
- private static boolean isApplicationEntity(TimelineEntity te) {
- return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString());
+ private void onApplicationCreated(String clusterId, String userId,
+ String flowName, String flowVersion, long flowRunId, String appId,
+ TimelineEntity te) throws IOException {
+ // store in App to flow table
+ storeInAppToFlowTable(clusterId, userId, flowName, flowVersion, flowRunId,
+ appId, te);
+ // store in flow run table
+ storeAppCreatedInFlowRunTable(clusterId, userId, flowName, flowVersion,
+ flowRunId, appId, te);
+ // store in flow activity table
+ storeInFlowActivityTable(clusterId, userId, flowName, flowVersion,
+ flowRunId, appId, te);
}
- private static boolean isApplicationCreated(TimelineEntity te) {
- if (isApplicationEntity(te)) {
- for (TimelineEvent event : te.getEvents()) {
- if (event.getId().equals(
- ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
- return true;
- }
- }
- }
- return false;
+ /*
+ * updates the {@link FlowActivityTable} with the Application TimelineEntity
+ * information
+ */
+ private void storeInFlowActivityTable(String clusterId, String userId,
+ String flowName, String flowVersion, long flowRunId, String appId,
+ TimelineEntity te) throws IOException {
+ byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, userId, flowName);
+ byte[] qualifier = GenericObjectMapper.write(flowRunId);
+ FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier,
+ null, flowVersion,
+ AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
}
- private void onApplicationCreated(String clusterId, String userId,
+ /*
+ * updates the {@link FlowRunTable} with Application Created information
+ */
+ private void storeAppCreatedInFlowRunTable(String clusterId, String userId,
+ String flowName, String flowVersion, long flowRunId, String appId,
+ TimelineEntity te) throws IOException {
+ byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
+ flowRunId);
+ FlowRunColumn.MIN_START_TIME.store(rowKey, flowRunTable, null,
+ te.getCreatedTime(),
+ AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+ }
+
+ private void storeInAppToFlowTable(String clusterId, String userId,
String flowName, String flowVersion, long flowRunId, String appId,
TimelineEntity te) throws IOException {
byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName);
- AppToFlowColumn.FLOW_RUN_ID.store(
- rowKey, appToFlowTable, null, flowRunId);
+ AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId);
+ }
+
+ /*
+ * updates the {@link FlowRunTable} and {@link FlowActivityTable} when an
+ * application has finished
+ */
+ private void onApplicationFinished(String clusterId, String userId,
+ String flowName, String flowVersion, long flowRunId, String appId,
+ TimelineEntity te) throws IOException {
+ // store in flow run table
+ storeAppFinishedInFlowRunTable(clusterId, userId, flowName, flowRunId,
+ appId, te);
+
+ // indicate in the flow activity table that the app has finished
+ storeInFlowActivityTable(clusterId, userId, flowName, flowVersion,
+ flowRunId, appId, te);
+ }
+
+ /*
+ * Update the {@link FlowRunTable} with Application Finished information
+ */
+ private void storeAppFinishedInFlowRunTable(String clusterId, String userId,
+ String flowName, long flowRunId, String appId, TimelineEntity te)
+ throws IOException {
+ byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
+ flowRunId);
+ Attribute attributeAppId = AggregationCompactionDimension.APPLICATION_ID
+ .getAttribute(appId);
+ FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null,
+ TimelineWriterUtils.getApplicationFinishedTime(te), attributeAppId);
+
+ // store the final value of metrics since application has finished
+ Set<TimelineMetric> metrics = te.getMetrics();
+ if (metrics != null) {
+ storeFlowMetrics(rowKey, metrics, attributeAppId,
+ AggregationOperation.SUM_FINAL.getAttribute());
+ }
+ }
+
+ /*
+ * Updates the {@link FlowRunTable} with Application Metrics
+ */
+ private void storeFlowMetricsAppRunning(String clusterId, String userId,
+ String flowName, long flowRunId, String appId, TimelineEntity te)
+ throws IOException {
+ Set<TimelineMetric> metrics = te.getMetrics();
+ if (metrics != null) {
+ byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
+ flowRunId);
+ storeFlowMetrics(rowKey, metrics,
+ AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+ }
+ }
+
+ private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
+ Attribute... attributes) throws IOException {
+ for (TimelineMetric metric : metrics) {
+ String metricColumnQualifier = metric.getId();
+ Map<Long, Number> timeseries = metric.getValues();
+ for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
+ Long timestamp = timeseriesEntry.getKey();
+ FlowRunColumnPrefix.METRIC.store(rowKey, flowRunTable,
+ metricColumnQualifier, timestamp, timeseriesEntry.getValue(),
+ attributes);
+ }
+ }
}
private void storeRelations(byte[] rowKey, TimelineEntity te,
@@ -184,7 +300,6 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
// id3?id4?id5
String compoundValue =
Separator.VALUES.joinEncoded(connectedEntity.getValue());
-
columnPrefix.store(rowKey, table, connectedEntity.getKey(), null,
compoundValue);
}
@@ -342,6 +457,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
entityTable.flush();
appToFlowTable.flush();
applicationTable.flush();
+ flowRunTable.flush();
+ flowActivityTable.flush();
}
/**
@@ -364,6 +481,16 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
LOG.info("closing the application table");
applicationTable.close();
}
+ if (flowRunTable != null) {
+ LOG.info("closing the flow run table");
+ // The close API performs flushing and releases any resources held
+ flowRunTable.close();
+ }
+ if (flowActivityTable != null) {
+ LOG.info("closing the flowActivityTable table");
+ // The close API performs flushing and releases any resources held
+ flowActivityTable.close();
+ }
if (conn != null) {
LOG.info("closing the hbase Connection");
conn.close();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
index e7e51a7..cbcff4c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
/**
* This creates the schema for a hbase based backend for storing application
@@ -199,7 +201,7 @@ public class TimelineSchemaCreator {
return commandLine;
}
- private static void createAllTables(Configuration hbaseConf,
+ public static void createAllTables(Configuration hbaseConf,
boolean skipExisting) throws IOException {
Connection conn = null;
@@ -236,6 +238,24 @@ public class TimelineSchemaCreator {
throw e;
}
}
+ try {
+ new FlowRunTable().createTable(admin, hbaseConf);
+ } catch (IOException e) {
+ if (skipExisting) {
+ LOG.warn("Skip and continue on: " + e.getMessage());
+ } else {
+ throw e;
+ }
+ }
+ try {
+ new FlowActivityTable().createTable(admin, hbaseConf);
+ } catch (IOException e) {
+ if (skipExisting) {
+ LOG.warn("Skip and continue on: " + e.getMessage());
+ } else {
+ throw e;
+ }
+ }
} finally {
if (conn != null) {
conn.close();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
index c028386..802626d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* Identifies fully qualified columns for the {@link ApplicationTable}.
@@ -76,9 +77,9 @@ public enum ApplicationColumn implements Column<ApplicationTable> {
public void store(byte[] rowKey,
TypedBufferedMutator<ApplicationTable> tableMutator, Long timestamp,
- Object inputValue) throws IOException {
+ Object inputValue, Attribute... attributes) throws IOException {
column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
- inputValue);
+ inputValue, attributes);
}
public Object readResult(Result result) throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
index ad1def6..d7b5773 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* Identifies partially qualified columns for the application table.
@@ -112,7 +113,8 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
*/
public void store(byte[] rowKey,
TypedBufferedMutator<ApplicationTable> tableMutator, byte[] qualifier,
- Long timestamp, Object inputValue) throws IOException {
+ Long timestamp, Object inputValue, Attribute... attributes)
+ throws IOException {
// Null check
if (qualifier == null) {
@@ -123,8 +125,9 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
byte[] columnQualifier =
ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
- column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
- }
+ column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+ attributes);
+ }
/*
* (non-Javadoc)
@@ -137,7 +140,8 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
*/
public void store(byte[] rowKey,
TypedBufferedMutator<ApplicationTable> tableMutator, String qualifier,
- Long timestamp, Object inputValue) throws IOException {
+ Long timestamp, Object inputValue, Attribute...attributes)
+ throws IOException {
// Null check
if (qualifier == null) {
@@ -148,7 +152,8 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
byte[] columnQualifier =
ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
- column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
+ column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+ attributes);
}
/*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
index 423037a..859fdca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
@@ -25,8 +25,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
import java.io.IOException;
+import java.util.Map;
/**
* Identifies fully qualified columns for the {@link AppToFlowTable}.
@@ -67,9 +69,9 @@ public enum AppToFlowColumn implements Column<AppToFlowTable> {
public void store(byte[] rowKey,
TypedBufferedMutator<AppToFlowTable> tableMutator, Long timestamp,
- Object inputValue) throws IOException {
+ Object inputValue, Attribute... attributes) throws IOException {
column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
- inputValue);
+ inputValue, attributes);
}
public Object readResult(Result result) throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
index 3397d62..64c1cda 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* A Column represents the way to store a fully qualified column in a specific
@@ -38,12 +39,15 @@ public interface Column<T> {
* column.
* @param timestamp version timestamp. When null the server timestamp will be
* used.
+ * @param attributes Map of attributes for this mutation. used in the coprocessor
+ * to set/read the cell tags. Can be null.
* @param inputValue the value to write to the rowKey and column qualifier.
* Nothing gets written when null.
* @throws IOException
*/
public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
- Long timestamp, Object inputValue) throws IOException;
+ Long timestamp, Object inputValue, Attribute... attributes)
+ throws IOException;
/**
* Get the latest version of this specified column. Note: this call clones the
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
index f1b7c58..3a2e088 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
@@ -31,7 +31,8 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
-
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* This class is meant to be used only by explicit Columns, and not directly to
* write by clients.
@@ -58,31 +59,66 @@ public class ColumnHelper<T> {
* Sends a Mutation to the table. The mutations will be buffered and sent over
* the wire as part of a batch.
*
- * @param rowKey identifying the row to write. Nothing gets written when null.
- * @param tableMutator used to modify the underlying HBase table
- * @param columnQualifier column qualifier. Nothing gets written when null.
- * @param timestamp version timestamp. When null the server timestamp will be
- * used.
- * @param inputValue the value to write to the rowKey and column qualifier.
- * Nothing gets written when null.
+ * @param rowKey
+ * identifying the row to write. Nothing gets written when null.
+ * @param tableMutator
+ * used to modify the underlying HBase table
+ * @param columnQualifier
+ * column qualifier. Nothing gets written when null.
+ * @param timestamp
+ * version timestamp. When null the current timestamp multiplied with
+ * TimestampGenerator.TS_MULTIPLIER and added with last 3 digits of
+ * app id will be used
+ * @param inputValue
+ * the value to write to the rowKey and column qualifier. Nothing
+ * gets written when null.
* @throws IOException
*/
public void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator,
- byte[] columnQualifier, Long timestamp, Object inputValue)
- throws IOException {
+ byte[] columnQualifier, Long timestamp, Object inputValue,
+ Attribute... attributes) throws IOException {
if ((rowKey == null) || (columnQualifier == null) || (inputValue == null)) {
return;
}
Put p = new Put(rowKey);
+ timestamp = getPutTimestamp(timestamp, attributes);
+ p.addColumn(columnFamilyBytes, columnQualifier, timestamp,
+ GenericObjectMapper.write(inputValue));
+ if ((attributes != null) && (attributes.length > 0)) {
+ for (Attribute attribute : attributes) {
+ p.setAttribute(attribute.getName(), attribute.getValue());
+ }
+ }
+ tableMutator.mutate(p);
+ }
+ /*
+ * Figures out the cell timestamp used in the Put For storing into flow run
+ * table. We would like to left shift the timestamp and supplement it with the
+ * AppId id so that there are no collisions in the flow run table's cells
+ */
+ private long getPutTimestamp(Long timestamp, Attribute[] attributes) {
if (timestamp == null) {
- p.addColumn(columnFamilyBytes, columnQualifier,
- GenericObjectMapper.write(inputValue));
- } else {
- p.addColumn(columnFamilyBytes, columnQualifier, timestamp,
- GenericObjectMapper.write(inputValue));
+ timestamp = System.currentTimeMillis();
}
- tableMutator.mutate(p);
+ String appId = getAppIdFromAttributes(attributes);
+ long supplementedTS = TimestampGenerator.getSupplementedTimestamp(
+ timestamp, appId);
+ return supplementedTS;
+ }
+
+ private String getAppIdFromAttributes(Attribute[] attributes) {
+ if (attributes == null) {
+ return null;
+ }
+ String appId = null;
+ for (Attribute attribute : attributes) {
+ if (AggregationCompactionDimension.APPLICATION_ID.toString().equals(
+ attribute.getName())) {
+ appId = Bytes.toString(attribute.getValue());
+ }
+ }
+ return appId;
}
/**
@@ -171,7 +207,9 @@ public class ColumnHelper<T> {
for (Entry<Long, byte[]> cell : cells.entrySet()) {
V value =
(V) GenericObjectMapper.read(cell.getValue());
- cellResults.put(cell.getKey(), value);
+ cellResults.put(
+ TimestampGenerator.getTruncatedTimestamp(cell.getKey()),
+ value);
}
}
results.put(columnName, cellResults);
@@ -315,6 +353,27 @@ public class ColumnHelper<T> {
/**
* @param columnPrefixBytes The byte representation for the column prefix.
* Should not contain {@link Separator#QUALIFIERS}.
+ * @param qualifier for the remainder of the column.
+ * @return fully sanitized column qualifier that is a combination of prefix
+ * and qualifier. If prefix is null, the result is simply the encoded
+ * qualifier without any separator.
+ */
+ public static byte[] getColumnQualifier(byte[] columnPrefixBytes,
+ long qualifier) {
+
+ if (columnPrefixBytes == null) {
+ return Bytes.toBytes(qualifier);
+ }
+
+ // Convert qualifier to lower case, strip of separators and tag on column
+ // prefix.
+ byte[] columnQualifier =
+ Separator.QUALIFIERS.join(columnPrefixBytes, Bytes.toBytes(qualifier));
+ return columnQualifier;
+ }
+ /**
+ * @param columnPrefixBytes The byte representation for the column prefix.
+ * Should not contain {@link Separator#QUALIFIERS}.
* @param qualifier the byte representation for the remainder of the column.
* @return fully sanitized column qualifier that is a combination of prefix
* and qualifier. If prefix is null, the result is simply the encoded
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
index 509ff49..db49098 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
@@ -23,6 +23,7 @@ import java.util.NavigableMap;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* Used to represent a partially qualified column, where the actual column name
@@ -43,12 +44,36 @@ public interface ColumnPrefix<T> {
* @param qualifier column qualifier. Nothing gets written when null.
* @param timestamp version timestamp. When null the server timestamp will be
* used.
+ *@param attributes attributes for the mutation that are used by the coprocessor
+ * to set/read the cell tags
* @param inputValue the value to write to the rowKey and column qualifier.
* Nothing gets written when null.
* @throws IOException
*/
public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
- String qualifier, Long timestamp, Object inputValue) throws IOException;
+ byte[] qualifier, Long timestamp, Object inputValue,
+ Attribute... attributes) throws IOException;
+
+ /**
+ * Sends a Mutation to the table. The mutations will be buffered and sent over
+ * the wire as part of a batch.
+ *
+ * @param rowKey identifying the row to write. Nothing gets written when null.
+ * @param tableMutator used to modify the underlying HBase table. Caller is
+ * responsible to pass a mutator for the table that actually has this
+ * column.
+ * @param qualifier column qualifier. Nothing gets written when null.
+ * @param timestamp version timestamp. When null the server timestamp will be
+ * used.
+ *@param attributes attributes for the mutation that are used by the coprocessor
+ * to set/read the cell tags
+ * @param inputValue the value to write to the rowKey and column qualifier.
+ * Nothing gets written when null.
+ * @throws IOException
+ */
+ public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
+ String qualifier, Long timestamp, Object inputValue,
+ Attribute... attributes) throws IOException;
/**
* Get the latest version of this specified column. Note: this call clones the
@@ -81,4 +106,5 @@ public interface ColumnPrefix<T> {
*/
public <V> NavigableMap<String, NavigableMap<Long, V>>
readResultsWithTimestamps(Result result) throws IOException;
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
index 58bdedc7e..371371a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
@@ -19,9 +19,19 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import java.util.ArrayList;
import java.util.List;
+import java.util.SortedSet;
+import java.util.Map.Entry;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* bunch of utility functions used across TimelineWriter classes
@@ -36,6 +46,9 @@ public class TimelineWriterUtils {
/** indicator for no limits for splitting */
public static final int NO_LIMIT_SPLIT = -1;
+ /** milliseconds in one day */
+ public static final long MILLIS_ONE_DAY = 86400000L;
+
/**
* Splits the source array into multiple array segments using the given
* separator, up to a maximum of count items. This will naturally produce
@@ -140,4 +153,176 @@ public class TimelineWriterUtils {
return Long.MAX_VALUE - key;
}
+ /**
+ * returns the timestamp of that day's start (which is midnight 00:00:00 AM)
+ * for a given input timestamp
+ *
+ * @param ts
+ * @return timestamp of that day's beginning (midnight)
+ */
+ public static long getTopOfTheDayTimestamp(long ts) {
+ long dayTimestamp = ts - (ts % MILLIS_ONE_DAY);
+ return dayTimestamp;
+ }
+
+ /**
+ * Combines the input array of attributes and the input aggregation operation
+ * into a new array of attributes.
+ *
+ * @param attributes
+ * @param aggOp
+ * @return array of combined attributes
+ */
+ public static Attribute[] combineAttributes(Attribute[] attributes,
+ AggregationOperation aggOp) {
+ int newLength = getNewLengthCombinedAttributes(attributes, aggOp);
+ Attribute[] combinedAttributes = new Attribute[newLength];
+
+ if (attributes != null) {
+ System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length);
+ }
+
+ if (aggOp != null) {
+ Attribute a2 = aggOp.getAttribute();
+ combinedAttributes[newLength - 1] = a2;
+ }
+ return combinedAttributes;
+ }
+
+ /**
+ * Returns a number for the new array size. The new array is the combination
+ * of input array of attributes and the input aggregation operation.
+ *
+ * @param attributes
+ * @param aggOp
+ * @return the size for the new array
+ */
+ private static int getNewLengthCombinedAttributes(Attribute[] attributes,
+ AggregationOperation aggOp) {
+ int oldLength = getAttributesLength(attributes);
+ int aggLength = getAppOpLength(aggOp);
+ return oldLength + aggLength;
+ }
+
+ private static int getAppOpLength(AggregationOperation aggOp) {
+ if (aggOp != null) {
+ return 1;
+ }
+ return 0;
+ }
+
+ private static int getAttributesLength(Attribute[] attributes) {
+ if (attributes != null) {
+ return attributes.length;
+ }
+ return 0;
+ }
+
+ /**
+ * checks if an application has finished
+ *
+ * @param te
+ * @return true if application has finished else false
+ */
+ public static boolean isApplicationFinished(TimelineEntity te) {
+ SortedSet<TimelineEvent> allEvents = te.getEvents();
+ if ((allEvents != null) && (allEvents.size() > 0)) {
+ TimelineEvent event = allEvents.last();
+ if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * get the time at which an app finished
+ *
+ * @param te
+ * @return true if application has finished else false
+ */
+ public static long getApplicationFinishedTime(TimelineEntity te) {
+ SortedSet<TimelineEvent> allEvents = te.getEvents();
+ if ((allEvents != null) && (allEvents.size() > 0)) {
+ TimelineEvent event = allEvents.last();
+ if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
+ return event.getTimestamp();
+ }
+ }
+ return 0l;
+ }
+
+ /**
+ * Checks if the input TimelineEntity object is an ApplicationEntity.
+ *
+ * @param te
+ * @return true if input is an ApplicationEntity, false otherwise
+ */
+ public static boolean isApplicationEntity(TimelineEntity te) {
+ return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString());
+ }
+
+ /**
+ * Checks for the APPLICATION_CREATED event.
+ *
+ * @param te
+ * @return true is application event exists, false otherwise
+ */
+ public static boolean isApplicationCreated(TimelineEntity te) {
+ if (isApplicationEntity(te)) {
+ for (TimelineEvent event : te.getEvents()) {
+ if (event.getId()
+ .equals(ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Returns the first seen aggregation operation as seen in the list of input
+ * tags or null otherwise
+ *
+ * @param tags
+ * @return AggregationOperation
+ */
+ public static AggregationOperation getAggregationOperationFromTagsList(
+ List<Tag> tags) {
+ for (AggregationOperation aggOp : AggregationOperation.values()) {
+ for (Tag tag : tags) {
+ if (tag.getType() == aggOp.getTagType()) {
+ return aggOp;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Creates a {@link Tag} from the input attribute.
+ *
+ * @param attribute
+ * @return Tag
+ */
+ public static Tag getTagFromAttribute(Entry<String, byte[]> attribute) {
+ // attribute could be either an Aggregation Operation or
+ // an Aggregation Dimension
+ // Get the Tag type from either
+ AggregationOperation aggOp = AggregationOperation
+ .getAggregationOperation(attribute.getKey());
+ if (aggOp != null) {
+ Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
+ return t;
+ }
+
+ AggregationCompactionDimension aggCompactDim = AggregationCompactionDimension
+ .getAggregationCompactionDimension(attribute.getKey());
+ if (aggCompactDim != null) {
+ Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
+ return t;
+ }
+ return null;
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
new file mode 100644
index 0000000..555b64e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+/**
+ * Utility class that allows HBase coprocessors to interact with unique
+ * timestamps.
+ */
+public class TimestampGenerator {
+
+ /*
+ * if this is changed, then reading cell timestamps written with older
+ * multiplier value will not work
+ */
+ public static final long TS_MULTIPLIER = 1000L;
+
+ private final AtomicLong lastTimestamp = new AtomicLong();
+
+ /**
+ * Returns the current wall clock time in milliseconds, multiplied by the
+ * required precision.
+ */
+ public long currentTime() {
+ // We want to align cell timestamps with current time.
+ // cell timestamps are not be less than
+ // System.currentTimeMillis() * TS_MULTIPLIER.
+ return System.currentTimeMillis() * TS_MULTIPLIER;
+ }
+
+ /**
+ * Returns a timestamp value unique within the scope of this
+ * {@code TimestampGenerator} instance. For usage by HBase
+ * {@code RegionObserver} coprocessors, this normally means unique within a
+ * given region.
+ *
+ * Unlikely scenario of generating a non-unique timestamp: if there is a
+ * sustained rate of more than 1M hbase writes per second AND if region fails
+ * over within that time range of timestamps being generated then there may be
+ * collisions writing to a cell version of the same column.
+ */
+ public long getUniqueTimestamp() {
+ long lastTs;
+ long nextTs;
+ do {
+ lastTs = lastTimestamp.get();
+ nextTs = Math.max(lastTs + 1, currentTime());
+ } while (!lastTimestamp.compareAndSet(lastTs, nextTs));
+ return nextTs;
+ }
+
+ /**
+ * returns a timestamp multiplied with TS_MULTIPLIER and last few digits of
+ * application id
+ *
+ * Unlikely scenario of generating a timestamp that is a duplicate: If more
+ * than a 1000 concurrent apps are running in one flow run AND write to same
+ * column at the same time, then say appId of 1001 will overlap with appId of
+ * 001 and there may be collisions for that flow run's specific column.
+ *
+ * @param incomingTS
+ * @param appId
+ * @return a timestamp multiplied with TS_MULTIPLIER and last few digits of
+ * application id
+ */
+ public static long getSupplementedTimestamp(long incomingTS, String appId) {
+ long suffix = getAppIdSuffix(appId);
+ long outgoingTS = incomingTS * TS_MULTIPLIER + suffix;
+ return outgoingTS;
+
+ }
+
+ private static long getAppIdSuffix(String appIdStr) {
+ if (appIdStr == null) {
+ return 0L;
+ }
+ ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
+ long id = appId.getId() % TS_MULTIPLIER;
+ return id;
+ }
+
+ /**
+ * truncates the last few digits of the timestamp which were supplemented by
+ * the TimestampGenerator#getSupplementedTimestamp function
+ *
+ * @param incomingTS
+ * @return a truncated timestamp value
+ */
+ public static long getTruncatedTimestamp(long incomingTS) {
+ return incomingTS / TS_MULTIPLIER;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
deleted file mode 100644
index 32577fb..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-package org.apache.hadoop.yarn.server.timelineservice.storage.common;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
index 26e7748..8ae19b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
import java.io.IOException;
+import java.util.Map;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
@@ -26,6 +27,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* Identifies fully qualified columns for the {@link EntityTable}.
@@ -81,9 +83,9 @@ public enum EntityColumn implements Column<EntityTable> {
public void store(byte[] rowKey,
TypedBufferedMutator<EntityTable> tableMutator, Long timestamp,
- Object inputValue) throws IOException {
+ Object inputValue, Attribute... attributes) throws IOException {
column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
- inputValue);
+ inputValue, attributes);
}
public Object readResult(Result result) throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
index 75ff742..0d4e5a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* Identifies partially qualified columns for the entity table.
@@ -108,11 +109,13 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #store(byte[],
* org.apache.hadoop.yarn.server.timelineservice.storage.common.
- * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
+ * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object,
+ * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
*/
public void store(byte[] rowKey,
TypedBufferedMutator<EntityTable> tableMutator, String qualifier,
- Long timestamp, Object inputValue) throws IOException {
+ Long timestamp, Object inputValue, Attribute... attributes)
+ throws IOException {
// Null check
if (qualifier == null) {
@@ -123,8 +126,9 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
byte[] columnQualifier =
ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
- column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
- }
+ column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+ attributes);
+ }
/*
* (non-Javadoc)
@@ -137,7 +141,8 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
*/
public void store(byte[] rowKey,
TypedBufferedMutator<EntityTable> tableMutator, byte[] qualifier,
- Long timestamp, Object inputValue) throws IOException {
+ Long timestamp, Object inputValue, Attribute... attributes)
+ throws IOException {
// Null check
if (qualifier == null) {
@@ -148,8 +153,9 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
byte[] columnQualifier =
ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
- column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
- }
+ column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+ attributes);
+ }
/*
* (non-Javadoc)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java
new file mode 100644
index 0000000..ff12c7b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Identifies the compaction dimensions for the data in the {@link FlowRunTable}
+ * .
+ */
+public enum AggregationCompactionDimension {
+
+ /**
+ * the application id
+ */
+ APPLICATION_ID((byte) 101);
+
+ private byte tagType;
+ private byte[] inBytes;
+
+ private AggregationCompactionDimension(byte tagType) {
+ this.tagType = tagType;
+ this.inBytes = Bytes.toBytes(this.name());
+ }
+
+ public Attribute getAttribute(String attributeValue) {
+ return new Attribute(this.name(), Bytes.toBytes(attributeValue));
+ }
+
+ public byte getTagType() {
+ return tagType;
+ }
+
+ public byte[] getInBytes() {
+ return this.inBytes.clone();
+ }
+
+ public static AggregationCompactionDimension getAggregationCompactionDimension(
+ String aggCompactDimStr) {
+ for (AggregationCompactionDimension aggDim : AggregationCompactionDimension
+ .values()) {
+ if (aggDim.name().equals(aggCompactDimStr)) {
+ return aggDim;
+ }
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
new file mode 100644
index 0000000..c635ce6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Identifies the attributes to be set for puts into the {@link FlowRunTable}.
+ * The numbers used for tagType are prime numbers
+ */
+public enum AggregationOperation {
+
+ /**
+ * When the flow was started.
+ */
+ MIN((byte) 71),
+
+ /**
+ * When it ended.
+ */
+ MAX((byte) 73),
+
+ /**
+ * The metrics of the flow
+ */
+ SUM((byte) 79),
+
+ /**
+ * application running
+ */
+ SUM_FINAL((byte) 83),
+
+ /**
+ * compact
+ */
+ COMPACT((byte) 89);
+
+ private byte tagType;
+ private byte[] inBytes;
+
+ private AggregationOperation(byte tagType) {
+ this.tagType = tagType;
+ this.inBytes = Bytes.toBytes(this.name());
+ }
+
+ public Attribute getAttribute() {
+ return new Attribute(this.name(), this.inBytes);
+ }
+
+ public byte getTagType() {
+ return tagType;
+ }
+
+ public byte[] getInBytes() {
+ return this.inBytes.clone();
+ }
+
+ /**
+ * returns the AggregationOperation enum that represents that string
+ * @param aggOpStr
+ * @return the AggregationOperation enum that represents that string
+ */
+ public static AggregationOperation getAggregationOperation(String aggOpStr) {
+ for (AggregationOperation aggOp : AggregationOperation.values()) {
+ if (aggOp.name().equals(aggOpStr)) {
+ return aggOp;
+ }
+ }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java
new file mode 100644
index 0000000..d3de518
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+/**
+ * Defines the attribute tuple to be set for puts into the {@link FlowRunTable}.
+ */
+public class Attribute {
+ private final String name;
+ private final byte[] value;
+
+ public Attribute(String name, byte[] value) {
+ this.name = name;
+ this.value = value.clone();
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public byte[] getValue() {
+ return value.clone();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java
new file mode 100644
index 0000000..d991b42
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents the flow run table column families.
+ */
+public enum FlowActivityColumnFamily implements ColumnFamily<FlowActivityTable> {
+
+ /**
+ * Info column family houses known columns, specifically ones included in
+ * columnfamily filters.
+ */
+ INFO("i");
+
+ /**
+ * Byte representation of this column family.
+ */
+ private final byte[] bytes;
+
+ /**
+ * @param value
+ * create a column family with this name. Must be lower case and
+ * without spaces.
+ */
+ private FlowActivityColumnFamily(String value) {
+ // column families should be lower case and not contain any spaces.
+ this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
+ }
+
+ public byte[] getBytes() {
+ return Bytes.copy(bytes);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
new file mode 100644
index 0000000..b899e5c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+
+/**
+ * Identifies partially qualified columns for the {@link FlowActivityTable}
+ */
+public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable> {
+
+ /**
+ * To store run ids of the flows
+ */
+ RUN_ID(FlowActivityColumnFamily.INFO, "r", null);
+
+ private final ColumnHelper<FlowActivityTable> column;
+ private final ColumnFamily<FlowActivityTable> columnFamily;
+
+ /**
+ * Can be null for those cases where the provided column qualifier is the
+ * entire column name.
+ */
+ private final String columnPrefix;
+ private final byte[] columnPrefixBytes;
+
+ private final AggregationOperation aggOp;
+
+ /**
+ * Private constructor, meant to be used by the enum definition.
+ *
+ * @param columnFamily
+ * that this column is stored in.
+ * @param columnPrefix
+ * for this column.
+ */
+ private FlowActivityColumnPrefix(
+ ColumnFamily<FlowActivityTable> columnFamily, String columnPrefix,
+ AggregationOperation aggOp) {
+ column = new ColumnHelper<FlowActivityTable>(columnFamily);
+ this.columnFamily = columnFamily;
+ this.columnPrefix = columnPrefix;
+ if (columnPrefix == null) {
+ this.columnPrefixBytes = null;
+ } else {
+ // Future-proof by ensuring the right column prefix hygiene.
+ this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE
+ .encode(columnPrefix));
+ }
+ this.aggOp = aggOp;
+ }
+
+ /**
+ * @return the column name value
+ */
+ public String getColumnPrefix() {
+ return columnPrefix;
+ }
+
+ public byte[] getColumnPrefixBytes() {
+ return columnPrefixBytes.clone();
+ }
+
+ public AggregationOperation getAttribute() {
+ return aggOp;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #store(byte[],
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+ * TypedBufferedMutator, byte[], java.lang.Long, java.lang.Object,
+ * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
+ */
+ @Override
+ public void store(byte[] rowKey,
+ TypedBufferedMutator<FlowActivityTable> tableMutator, byte[] qualifier,
+ Long timestamp, Object inputValue, Attribute... attributes)
+ throws IOException {
+ // Null check
+ if (qualifier == null) {
+ throw new IOException("Cannot store column with null qualifier in "
+ + tableMutator.getName().getNameAsString());
+ }
+
+ byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifier);
+ Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+ attributes, this.aggOp);
+ column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+ combinedAttributes);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
+ */
+ public Object readResult(Result result, String qualifier) throws IOException {
+ byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifier);
+ return column.readResult(result, columnQualifier);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #readResults(org.apache.hadoop.hbase.client.Result)
+ */
+ public Map<String, Object> readResults(Result result) throws IOException {
+ return column.readResults(result, columnPrefixBytes);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
+ */
+ public <T> NavigableMap<String, NavigableMap<Long, T>> readResultsWithTimestamps(
+ Result result) throws IOException {
+ return column.readResultsWithTimestamps(result, columnPrefixBytes);
+ }
+
+ /**
+ * Retrieve an {@link FlowActivityColumnPrefix} given a name, or null if there
+ * is no match. The following holds true: {@code columnFor(x) == columnFor(y)}
+ * if and only if {@code x.equals(y)} or {@code (x == y == null)}
+ *
+ * @param columnPrefix
+ * Name of the column to retrieve
+ * @return the corresponding {@link FlowActivityColumnPrefix} or null
+ */
+ public static final FlowActivityColumnPrefix columnFor(String columnPrefix) {
+
+ // Match column based on value, assume column family matches.
+ for (FlowActivityColumnPrefix flowActivityColPrefix : FlowActivityColumnPrefix
+ .values()) {
+ // Find a match based only on name.
+ if (flowActivityColPrefix.getColumnPrefix().equals(columnPrefix)) {
+ return flowActivityColPrefix;
+ }
+ }
+ // Default to null
+ return null;
+ }
+
+ /**
+ * Retrieve an {@link FlowActivityColumnPrefix} given a name, or null if there
+ * is no match. The following holds true:
+ * {@code columnFor(a,x) == columnFor(b,y)} if and only if
+ * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)}
+ *
+ * @param columnFamily
+ * The columnFamily for which to retrieve the column.
+ * @param columnPrefix
+ * Name of the column to retrieve
+ * @return the corresponding {@link FlowActivityColumnPrefix} or null if both
+ * arguments don't match.
+ */
+ public static final FlowActivityColumnPrefix columnFor(
+ FlowActivityColumnFamily columnFamily, String columnPrefix) {
+
+ // TODO: needs unit test to confirm and need to update javadoc to explain
+ // null prefix case.
+
+ for (FlowActivityColumnPrefix flowActivityColumnPrefix : FlowActivityColumnPrefix
+ .values()) {
+ // Find a match based column family and on name.
+ if (flowActivityColumnPrefix.columnFamily.equals(columnFamily)
+ && (((columnPrefix == null) && (flowActivityColumnPrefix
+ .getColumnPrefix() == null)) || (flowActivityColumnPrefix
+ .getColumnPrefix().equals(columnPrefix)))) {
+ return flowActivityColumnPrefix;
+ }
+ }
+ // Default to null
+ return null;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #store(byte[],
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+ * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object,
+ * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
+ */
+ @Override
+ public void store(byte[] rowKey,
+ TypedBufferedMutator<FlowActivityTable> tableMutator, String qualifier,
+ Long timestamp, Object inputValue, Attribute... attributes)
+ throws IOException {
+ // Null check
+ if (qualifier == null) {
+ throw new IOException("Cannot store column with null qualifier in "
+ + tableMutator.getName().getNameAsString());
+ }
+
+ byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifier);
+ Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+ attributes, this.aggOp);
+ column.store(rowKey, tableMutator, columnQualifier, null, inputValue,
+ combinedAttributes);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
new file mode 100644
index 0000000..19e4e83
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+
+/**
+ * Represents a rowkey for the flow activity table.
+ */
+public class FlowActivityRowKey {
+
+ private final String clusterId;
+ private final long dayTs;
+ private final String userId;
+ private final String flowId;
+
+ public FlowActivityRowKey(String clusterId, long dayTs, String userId,
+ String flowId) {
+ this.clusterId = clusterId;
+ this.dayTs = dayTs;
+ this.userId = userId;
+ this.flowId = flowId;
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public long getDayTimestamp() {
+ return dayTs;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public String getFlowId() {
+ return flowId;
+ }
+
+ /**
+ * Constructs a row key for the flow activity table as follows:
+ * {@code clusterId!dayTimestamp!user!flowId}
+ *
+ * Will insert into current day's record in the table
+ * @param clusterId
+ * @param userId
+ * @param flowId
+ * @return byte array with the row key prefix
+ */
+ public static byte[] getRowKey(String clusterId, String userId, String flowId) {
+ long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+ .currentTimeMillis());
+ return getRowKey(clusterId, dayTs, userId, flowId);
+ }
+
+ /**
+ * Constructs a row key for the flow activity table as follows:
+ * {@code clusterId!dayTimestamp!user!flowId}
+ *
+ * @param clusterId
+ * @param dayTs
+ * @param userId
+ * @param flowId
+ * @return byte array for the row key
+ */
+ public static byte[] getRowKey(String clusterId, long dayTs, String userId,
+ String flowId) {
+ return Separator.QUALIFIERS.join(
+ Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)),
+ Bytes.toBytes(TimelineWriterUtils.invert(dayTs)),
+ Bytes.toBytes(Separator.QUALIFIERS.encode(userId)),
+ Bytes.toBytes(Separator.QUALIFIERS.encode(flowId)));
+ }
+
+ /**
+ * Given the raw row key as bytes, returns the row key as an object.
+ */
+ public static FlowActivityRowKey parseRowKey(byte[] rowKey) {
+ byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
+
+ if (rowKeyComponents.length < 4) {
+ throw new IllegalArgumentException("the row key is not valid for "
+ + "a flow activity");
+ }
+
+ String clusterId = Separator.QUALIFIERS.decode(Bytes
+ .toString(rowKeyComponents[0]));
+ long dayTs = TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[1]));
+ String userId = Separator.QUALIFIERS.decode(Bytes
+ .toString(rowKeyComponents[2]));
+ String flowId = Separator.QUALIFIERS.decode(Bytes
+ .toString(rowKeyComponents[3]));
+ return new FlowActivityRowKey(clusterId, dayTs, userId, flowId);
+ }
+}