You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by va...@apache.org on 2016/06/10 15:37:38 UTC
[4/4] hadoop git commit: YARN-5170. Eliminate singleton converters
and static method access
YARN-5170. Eliminate singleton converters and static method access
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b8271a57
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b8271a57
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b8271a57
Branch: refs/heads/YARN-2928
Commit: b8271a578977e3dc9c94f72635375c7f76ab2cc1
Parents: c39b9c4
Author: Varun Saxena <va...@apache.org>
Authored: Fri Jun 10 20:56:55 2016 +0530
Committer: Varun Saxena <va...@apache.org>
Committed: Fri Jun 10 20:56:55 2016 +0530
----------------------------------------------------------------------
.../storage/TestHBaseTimelineStorage.java | 43 ++-
.../flow/TestHBaseStorageFlowActivity.java | 11 +-
.../storage/flow/TestHBaseStorageFlowRun.java | 11 +-
.../flow/TestHBaseStorageFlowRunCompaction.java | 54 +--
.../reader/filter/TimelineFilterUtils.java | 38 ---
.../storage/HBaseTimelineWriterImpl.java | 226 ++++++++-----
.../storage/application/ApplicationColumn.java | 2 +-
.../application/ApplicationColumnPrefix.java | 3 +-
.../storage/application/ApplicationRowKey.java | 173 +++++++---
.../application/ApplicationRowKeyConverter.java | 130 --------
.../application/ApplicationRowKeyPrefix.java | 69 ++++
.../storage/apptoflow/AppToFlowRowKey.java | 93 +++++-
.../apptoflow/AppToFlowRowKeyConverter.java | 96 ------
.../storage/common/AppIdKeyConverter.java | 11 +-
.../storage/common/EventColumnName.java | 15 +
.../common/EventColumnNameConverter.java | 12 +-
.../storage/common/LongConverter.java | 27 +-
.../storage/common/LongKeyConverter.java | 14 +-
.../storage/common/RowKeyPrefix.java | 42 +++
.../storage/common/StringKeyConverter.java | 7 +-
.../storage/common/TimelineStorageUtils.java | 171 ----------
.../storage/entity/EntityColumn.java | 3 +-
.../storage/entity/EntityColumnPrefix.java | 3 +-
.../storage/entity/EntityRowKey.java | 187 ++++++++---
.../storage/entity/EntityRowKeyConverter.java | 143 --------
.../storage/entity/EntityRowKeyPrefix.java | 74 +++++
.../storage/flow/FlowActivityRowKey.java | 162 ++++++---
.../flow/FlowActivityRowKeyConverter.java | 115 -------
.../storage/flow/FlowActivityRowKeyPrefix.java | 60 ++++
.../storage/flow/FlowRunColumn.java | 6 +-
.../storage/flow/FlowRunColumnPrefix.java | 2 +-
.../storage/flow/FlowRunRowKey.java | 129 +++++--
.../storage/flow/FlowRunRowKeyConverter.java | 120 -------
.../storage/flow/FlowRunRowKeyPrefix.java | 54 +++
.../storage/reader/ApplicationEntityReader.java | 103 +++---
.../reader/FlowActivityEntityReader.java | 26 +-
.../storage/reader/FlowRunEntityReader.java | 80 ++---
.../storage/reader/GenericEntityReader.java | 333 ++++++++++---------
.../storage/reader/TimelineEntityReader.java | 136 +++++++-
.../storage/common/TestKeyConverters.java | 287 ++++------------
.../storage/common/TestRowKeys.java | 244 ++++++++++++++
41 files changed, 1883 insertions(+), 1632 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
index 7b647eb..fd5a7f5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
@@ -70,12 +70,14 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.junit.After;
import org.junit.AfterClass;
@@ -649,8 +651,9 @@ public class TestHBaseTimelineStorage {
infoMap.putAll(infoMap1);
// retrieve the row
- byte[] rowKey =
- ApplicationRowKey.getRowKey(cluster, user, flow, runid, appId);
+ ApplicationRowKey applicationRowKey =
+ new ApplicationRowKey(cluster, user, flow, runid, appId);
+ byte[] rowKey = applicationRowKey.getRowKey();
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
Connection conn = ConnectionFactory.createConnection(c1);
@@ -674,7 +677,7 @@ public class TestHBaseTimelineStorage {
Map<String, Object> infoColumns =
ApplicationColumnPrefix.INFO.readResults(result,
- StringKeyConverter.getInstance());
+ new StringKeyConverter());
assertEquals(infoMap, infoColumns);
// Remember isRelatedTo is of type Map<String, Set<String>>
@@ -710,15 +713,16 @@ public class TestHBaseTimelineStorage {
}
}
+ KeyConverter<String> stringKeyConverter = new StringKeyConverter();
// Configuration
Map<String, Object> configColumns =
- ApplicationColumnPrefix.CONFIG.readResults(result,
- StringKeyConverter.getInstance());
+ ApplicationColumnPrefix.CONFIG
+ .readResults(result, stringKeyConverter);
assertEquals(conf, configColumns);
NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
- ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(
- result, StringKeyConverter.getInstance());
+ ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result,
+ stringKeyConverter);
NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
matchMetrics(metricValues, metricMap);
@@ -908,7 +912,8 @@ public class TestHBaseTimelineStorage {
// scan the table and see that entity exists
Scan s = new Scan();
byte[] startRow =
- EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
+ new EntityRowKeyPrefix(cluster, user, flow, runid, appName)
+ .getRowKeyPrefix();
s.setStartRow(startRow);
s.setMaxVersions(Integer.MAX_VALUE);
Connection conn = ConnectionFactory.createConnection(c1);
@@ -916,6 +921,7 @@ public class TestHBaseTimelineStorage {
int rowCount = 0;
int colCount = 0;
+ KeyConverter<String> stringKeyConverter = new StringKeyConverter();
for (Result result : scanner) {
if (result != null && !result.isEmpty()) {
rowCount++;
@@ -936,7 +942,7 @@ public class TestHBaseTimelineStorage {
Map<String, Object> infoColumns =
EntityColumnPrefix.INFO.readResults(result,
- StringKeyConverter.getInstance());
+ new StringKeyConverter());
assertEquals(infoMap, infoColumns);
// Remember isRelatedTo is of type Map<String, Set<String>>
@@ -975,13 +981,12 @@ public class TestHBaseTimelineStorage {
// Configuration
Map<String, Object> configColumns =
- EntityColumnPrefix.CONFIG.readResults(result,
- StringKeyConverter.getInstance());
+ EntityColumnPrefix.CONFIG.readResults(result, stringKeyConverter);
assertEquals(conf, configColumns);
NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
- EntityColumnPrefix.METRIC.readResultsWithTimestamps(
- result, StringKeyConverter.getInstance());
+ EntityColumnPrefix.METRIC.readResultsWithTimestamps(result,
+ stringKeyConverter);
NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
matchMetrics(metricValues, metricMap);
@@ -1116,8 +1121,9 @@ public class TestHBaseTimelineStorage {
hbi.stop();
// retrieve the row
- byte[] rowKey =
- ApplicationRowKey.getRowKey(cluster, user, flow, runid, appName);
+ ApplicationRowKey applicationRowKey =
+ new ApplicationRowKey(cluster, user, flow, runid, appName);
+ byte[] rowKey = applicationRowKey.getRowKey();
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
Connection conn = ConnectionFactory.createConnection(c1);
@@ -1132,7 +1138,7 @@ public class TestHBaseTimelineStorage {
Map<EventColumnName, Object> eventsResult =
ApplicationColumnPrefix.EVENT.readResults(result,
- EventColumnNameConverter.getInstance());
+ new EventColumnNameConverter());
// there should be only one event
assertEquals(1, eventsResult.size());
for (Map.Entry<EventColumnName, Object> e : eventsResult.entrySet()) {
@@ -1212,7 +1218,8 @@ public class TestHBaseTimelineStorage {
String appName = ApplicationId.newInstance(System.currentTimeMillis() +
9000000L, 1).toString();
byte[] startRow =
- EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
+ new EntityRowKeyPrefix(cluster, user, flow, runid, appName)
+ .getRowKeyPrefix();
hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
hbi.stop();
// scan the table and see that entity exists
@@ -1234,7 +1241,7 @@ public class TestHBaseTimelineStorage {
Map<EventColumnName, Object> eventsResult =
EntityColumnPrefix.EVENT.readResults(result,
- EventColumnNameConverter.getInstance());
+ new EventColumnNameConverter());
// there should be only one event
assertEquals(1, eventsResult.size());
for (Map.Entry<EventColumnName, Object> e : eventsResult.entrySet()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
index 589b78d..37490ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
@@ -158,7 +158,7 @@ public class TestHBaseStorageFlowActivity {
Table table1 = conn.getTable(TableName
.valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
byte[] startRow =
- FlowActivityRowKey.getRowKey(cluster, minStartTs, user, flow);
+ new FlowActivityRowKey(cluster, minStartTs, user, flow).getRowKey();
Get g = new Get(startRow);
Result r1 = table1.get(g);
assertNotNull(r1);
@@ -278,11 +278,12 @@ public class TestHBaseStorageFlowActivity {
Scan s = new Scan();
s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
byte[] startRow =
- FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow);
+ new FlowActivityRowKey(cluster, appCreatedTime, user, flow).getRowKey();
s.setStartRow(startRow);
String clusterStop = cluster + "1";
byte[] stopRow =
- FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow);
+ new FlowActivityRowKey(clusterStop, appCreatedTime, user, flow)
+ .getRowKey();
s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1);
Table table1 = conn.getTable(TableName
@@ -420,11 +421,11 @@ public class TestHBaseStorageFlowActivity {
Scan s = new Scan();
s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
byte[] startRow =
- FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow);
+ new FlowActivityRowKey(cluster, appCreatedTime, user, flow).getRowKey();
s.setStartRow(startRow);
String clusterStop = cluster + "1";
byte[] stopRow =
- FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow);
+ new FlowActivityRowKey(clusterStop, appCreatedTime, user, flow).getRowKey();
s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1);
Table table1 = conn.getTable(TableName
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index a443b50..328b25a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -59,8 +59,8 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
@@ -224,7 +224,7 @@ public class TestHBaseStorageFlowRun {
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
// scan the table and see that we get back the right min and max
// timestamps
- byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+ byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
Get g = new Get(startRow);
g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes());
@@ -354,10 +354,11 @@ public class TestHBaseStorageFlowRun {
long runid, Configuration c1) throws IOException {
Scan s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
- byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+ byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
s.setStartRow(startRow);
String clusterStop = cluster + "1";
- byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid);
+ byte[] stopRow =
+ new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey();
s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1);
Table table1 = conn.getTable(TableName
@@ -629,7 +630,7 @@ public class TestHBaseStorageFlowRun {
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
// scan the table and see that we get back the right min and max
// timestamps
- byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+ byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
Get g = new Get(startRow);
g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
index 6b0ee5c..e1bef53 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
@@ -19,24 +19,24 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertNotEquals;
import java.io.IOException;
-import java.util.Map;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
-import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
@@ -46,21 +46,21 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
/**
* Tests the FlowRun and FlowActivity Tables
@@ -194,10 +194,11 @@ public class TestHBaseStorageFlowRunCompaction {
long runid, Configuration c1, int valueCount) throws IOException {
Scan s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
- byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+ byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
s.setStartRow(startRow);
String clusterStop = cluster + "1";
- byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid);
+ byte[] stopRow =
+ new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey();
s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1);
Table table1 = conn.getTable(TableName
@@ -302,8 +303,9 @@ public class TestHBaseStorageFlowRunCompaction {
cell4Ts, Bytes.toBytes(cellValue4), tagByteArray);
currentColumnCells.add(c4);
- List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
- LongConverter.getInstance(), currentTimestamp);
+ List<Cell> cells =
+ fs.processSummationMajorCompaction(currentColumnCells,
+ new LongConverter(), currentTimestamp);
assertNotNull(cells);
// we should be getting back 4 cells
@@ -387,8 +389,9 @@ public class TestHBaseStorageFlowRunCompaction {
cellTsNotFinal++;
}
- List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
- LongConverter.getInstance(), currentTimestamp);
+ List<Cell> cells =
+ fs.processSummationMajorCompaction(currentColumnCells,
+ new LongConverter(), currentTimestamp);
assertNotNull(cells);
// we should be getting back count + 1 cells
@@ -489,8 +492,9 @@ public class TestHBaseStorageFlowRunCompaction {
cellTsNotFinal++;
}
- List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
- LongConverter.getInstance(), currentTimestamp);
+ List<Cell> cells =
+ fs.processSummationMajorCompaction(currentColumnCells,
+ new LongConverter(), currentTimestamp);
assertNotNull(cells);
// we should be getting back
@@ -554,7 +558,7 @@ public class TestHBaseStorageFlowRunCompaction {
130L, Bytes.toBytes(cellValue2), tagByteArray);
currentColumnCells.add(c2);
List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
- LongConverter.getInstance(), currentTimestamp);
+ new LongConverter(), currentTimestamp);
assertNotNull(cells);
// we should be getting back two cells
@@ -602,7 +606,7 @@ public class TestHBaseStorageFlowRunCompaction {
currentColumnCells.add(c1);
List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
- LongConverter.getInstance(), currentTimestamp);
+ new LongConverter(), currentTimestamp);
assertNotNull(cells);
// we should not get the same cell back
// but we get back the flow cell
@@ -639,7 +643,7 @@ public class TestHBaseStorageFlowRunCompaction {
currentTimestamp, Bytes.toBytes(1110L), tagByteArray);
currentColumnCells.add(c1);
List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
- LongConverter.getInstance(), currentTimestamp);
+ new LongConverter(), currentTimestamp);
assertNotNull(cells);
// we expect the same cell back
assertEquals(1, cells.size());
@@ -653,15 +657,19 @@ public class TestHBaseStorageFlowRunCompaction {
FlowScanner fs = getFlowScannerForTestingCompaction();
long currentTimestamp = System.currentTimeMillis();
+ LongConverter longConverter = new LongConverter();
+
SortedSet<Cell> currentColumnCells = null;
- List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
- LongConverter.getInstance(), currentTimestamp);
+ List<Cell> cells =
+ fs.processSummationMajorCompaction(currentColumnCells, longConverter,
+ currentTimestamp);
assertNotNull(cells);
assertEquals(0, cells.size());
currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
- cells = fs.processSummationMajorCompaction(currentColumnCells,
- LongConverter.getInstance(), currentTimestamp);
+ cells =
+ fs.processSummationMajorCompaction(currentColumnCells, longConverter,
+ currentTimestamp);
assertNotNull(cells);
assertEquals(0, cells.size());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
index 036746b..cccae26 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
@@ -31,14 +31,9 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
@@ -209,39 +204,6 @@ public final class TimelineFilterUtils {
return singleColValFilter;
}
- private static <T> byte[] createColQualifierPrefix(ColumnPrefix<T> colPrefix,
- String column) {
- if (colPrefix == ApplicationColumnPrefix.EVENT ||
- colPrefix == EntityColumnPrefix.EVENT) {
- return EventColumnNameConverter.getInstance().encode(
- new EventColumnName(column, null, null));
- } else {
- return StringKeyConverter.getInstance().encode(column);
- }
- }
-
- /**
- * Create a filter list of qualifier filters based on passed set of columns.
- *
- * @param <T> Describes the type of column prefix.
- * @param colPrefix Column Prefix.
- * @param columns set of column qualifiers.
- * @return filter list.
- */
- public static <T> FilterList createFiltersFromColumnQualifiers(
- ColumnPrefix<T> colPrefix, Set<String> columns) {
- FilterList list = new FilterList(Operator.MUST_PASS_ONE);
- for (String column : columns) {
- // For columns which have compound column qualifiers (eg. events), we need
- // to include the required separator.
- byte[] compoundColQual = createColQualifierPrefix(colPrefix, column);
- list.addFilter(new QualifierFilter(CompareOp.EQUAL,
- new BinaryPrefixComparator(
- colPrefix.getColumnPrefixBytes(compoundColQual))));
- }
- return list;
- }
-
/**
* Fetch columns from filter list containing exists and multivalue equality
* filters. This is done to fetch only required columns from back-end and
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index f8b5a65..3511a2f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
@@ -45,11 +46,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlow
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
@@ -86,6 +86,17 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
private TypedBufferedMutator<FlowActivityTable> flowActivityTable;
private TypedBufferedMutator<FlowRunTable> flowRunTable;
+ /**
+ * Used to convert strings key components to and from storage format.
+ */
+ private final KeyConverter<String> stringKeyConverter =
+ new StringKeyConverter();
+
+ /**
+ * Used to convert Long key components to and from storage format.
+ */
+ private final KeyConverter<Long> longKeyConverter = new LongKeyConverter();
+
public HBaseTimelineWriterImpl() {
super(HBaseTimelineWriterImpl.class.getName());
}
@@ -138,12 +149,19 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
// if the entity is the application, the destination is the application
// table
- boolean isApplication = TimelineStorageUtils.isApplicationEntity(te);
- byte[] rowKey = isApplication ?
- ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId,
- appId) :
- EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId,
- te.getType(), te.getId());
+ boolean isApplication = isApplicationEntity(te);
+ byte[] rowKey;
+ if (isApplication) {
+ ApplicationRowKey applicationRowKey =
+ new ApplicationRowKey(clusterId, userId, flowName, flowRunId,
+ appId);
+ rowKey = applicationRowKey.getRowKey();
+ } else {
+ EntityRowKey entityRowKey =
+ new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
+ te.getType(), te.getId());
+ rowKey = entityRowKey.getRowKey();
+ }
storeInfo(rowKey, te, flowVersion, isApplication);
storeEvents(rowKey, te.getEvents(), isApplication);
@@ -152,102 +170,101 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
storeRelations(rowKey, te, isApplication);
if (isApplication) {
- TimelineEvent event = TimelineStorageUtils.getApplicationEvent(te,
- ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ TimelineEvent event =
+ getApplicationEvent(te,
+ ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ FlowRunRowKey flowRunRowKey =
+ new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
if (event != null) {
- onApplicationCreated(clusterId, userId, flowName, flowVersion,
- flowRunId, appId, te, event.getTimestamp());
+ AppToFlowRowKey appToFlowRowKey =
+ new AppToFlowRowKey(clusterId, appId);
+ onApplicationCreated(flowRunRowKey, appToFlowRowKey, appId, userId,
+ flowVersion, te, event.getTimestamp());
}
// if it's an application entity, store metrics
- storeFlowMetricsAppRunning(clusterId, userId, flowName, flowRunId,
- appId, te);
+ storeFlowMetricsAppRunning(flowRunRowKey, appId, te);
// if application has finished, store it's finish time and write final
// values of all metrics
- event = TimelineStorageUtils.getApplicationEvent(te,
+ event = getApplicationEvent(te,
ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
if (event != null) {
- onApplicationFinished(clusterId, userId, flowName, flowVersion,
- flowRunId, appId, te, event.getTimestamp());
+ onApplicationFinished(flowRunRowKey, flowVersion, appId, te,
+ event.getTimestamp());
}
}
}
return putStatus;
}
- private void onApplicationCreated(String clusterId, String userId,
- String flowName, String flowVersion, long flowRunId, String appId,
- TimelineEntity te, long appCreatedTimeStamp) throws IOException {
+ private void onApplicationCreated(FlowRunRowKey flowRunRowKey,
+ AppToFlowRowKey appToFlowRowKey, String appId, String userId,
+ String flowVersion, TimelineEntity te, long appCreatedTimeStamp)
+ throws IOException {
+
+ String flowName = flowRunRowKey.getFlowName();
+ Long flowRunId = flowRunRowKey.getFlowRunId();
+
// store in App to flow table
- storeInAppToFlowTable(clusterId, userId, flowName, flowRunId, appId, te);
+ byte[] rowKey = appToFlowRowKey.getRowKey();
+ AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName);
+ AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId);
+ AppToFlowColumn.USER_ID.store(rowKey, appToFlowTable, null, userId);
+
// store in flow run table
- storeAppCreatedInFlowRunTable(clusterId, userId, flowName, flowVersion,
- flowRunId, appId, te);
- // store in flow activity table
- storeInFlowActivityTable(clusterId, userId, flowName, flowVersion,
- flowRunId, appId, appCreatedTimeStamp);
- }
+ storeAppCreatedInFlowRunTable(flowRunRowKey, appId, te);
- /*
- * updates the {@link FlowActivityTable} with the Application TimelineEntity
- * information
- */
- private void storeInFlowActivityTable(String clusterId, String userId,
- String flowName, String flowVersion, long flowRunId, String appId,
- long activityTimeStamp) throws IOException {
- byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, activityTimeStamp,
- userId, flowName);
- byte[] qualifier = LongKeyConverter.getInstance().encode(flowRunId);
- FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier,
- null, flowVersion,
+ // store in flow activity table
+ byte[] flowActivityRowKeyBytes =
+ new FlowActivityRowKey(flowRunRowKey.getClusterId(),
+ appCreatedTimeStamp, flowRunRowKey.getUserId(), flowName)
+ .getRowKey();
+ byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId());
+ FlowActivityColumnPrefix.RUN_ID.store(flowActivityRowKeyBytes,
+ flowActivityTable, qualifier, null, flowVersion,
AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
}
/*
* updates the {@link FlowRunTable} with Application Created information
*/
- private void storeAppCreatedInFlowRunTable(String clusterId, String userId,
- String flowName, String flowVersion, long flowRunId, String appId,
- TimelineEntity te) throws IOException {
- byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
- flowRunId);
+ private void storeAppCreatedInFlowRunTable(FlowRunRowKey flowRunRowKey,
+ String appId, TimelineEntity te) throws IOException {
+ byte[] rowKey = flowRunRowKey.getRowKey();
FlowRunColumn.MIN_START_TIME.store(rowKey, flowRunTable, null,
te.getCreatedTime(),
AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
}
- private void storeInAppToFlowTable(String clusterId, String userId,
- String flowName, long flowRunId, String appId, TimelineEntity te)
- throws IOException {
- byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
- AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName);
- AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId);
- AppToFlowColumn.USER_ID.store(rowKey, appToFlowTable, null, userId);
- }
/*
* updates the {@link FlowRunTable} and {@link FlowActivityTable} when an
* application has finished
*/
- private void onApplicationFinished(String clusterId, String userId,
- String flowName, String flowVersion, long flowRunId, String appId,
- TimelineEntity te, long appFinishedTimeStamp) throws IOException {
+ private void onApplicationFinished(FlowRunRowKey flowRunRowKey,
+ String flowVersion, String appId, TimelineEntity te,
+ long appFinishedTimeStamp) throws IOException {
// store in flow run table
- storeAppFinishedInFlowRunTable(clusterId, userId, flowName, flowRunId,
- appId, te, appFinishedTimeStamp);
+ storeAppFinishedInFlowRunTable(flowRunRowKey, appId, te,
+ appFinishedTimeStamp);
// indicate in the flow activity table that the app has finished
- storeInFlowActivityTable(clusterId, userId, flowName, flowVersion,
- flowRunId, appId, appFinishedTimeStamp);
+ byte[] rowKey =
+ new FlowActivityRowKey(flowRunRowKey.getClusterId(),
+ appFinishedTimeStamp, flowRunRowKey.getUserId(),
+ flowRunRowKey.getFlowName()).getRowKey();
+ byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId());
+ FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier,
+ null, flowVersion,
+ AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
}
/*
* Update the {@link FlowRunTable} with Application Finished information
*/
- private void storeAppFinishedInFlowRunTable(String clusterId, String userId,
- String flowName, long flowRunId, String appId, TimelineEntity te,
- long appFinishedTimeStamp) throws IOException {
- byte[] rowKey =
- FlowRunRowKey.getRowKey(clusterId, userId, flowName, flowRunId);
+ private void storeAppFinishedInFlowRunTable(FlowRunRowKey flowRunRowKey,
+ String appId, TimelineEntity te, long appFinishedTimeStamp)
+ throws IOException {
+ byte[] rowKey = flowRunRowKey.getRowKey();
Attribute attributeAppId =
AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId);
FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null,
@@ -264,13 +281,11 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
/*
* Updates the {@link FlowRunTable} with Application Metrics
*/
- private void storeFlowMetricsAppRunning(String clusterId, String userId,
- String flowName, long flowRunId, String appId, TimelineEntity te)
- throws IOException {
+ private void storeFlowMetricsAppRunning(FlowRunRowKey flowRunRowKey,
+ String appId, TimelineEntity te) throws IOException {
Set<TimelineMetric> metrics = te.getMetrics();
if (metrics != null) {
- byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
- flowRunId);
+ byte[] rowKey = flowRunRowKey.getRowKey();
storeFlowMetrics(rowKey, metrics,
AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId),
AggregationOperation.SUM.getAttribute());
@@ -280,8 +295,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
Attribute... attributes) throws IOException {
for (TimelineMetric metric : metrics) {
- byte[] metricColumnQualifier =
- StringKeyConverter.getInstance().encode(metric.getId());
+ byte[] metricColumnQualifier = stringKeyConverter.encode(metric.getId());
Map<Long, Number> timeseries = metric.getValues();
for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
Long timestamp = timeseriesEntry.getKey();
@@ -320,8 +334,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
String compoundValue =
Separator.VALUES.joinEncoded(connectedEntity.getValue());
columnPrefix.store(rowKey, table,
- StringKeyConverter.getInstance().encode(connectedEntity.getKey()),
- null, compoundValue);
+ stringKeyConverter.encode(connectedEntity.getKey()), null,
+ compoundValue);
}
}
@@ -341,7 +355,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
if (info != null) {
for (Map.Entry<String, Object> entry : info.entrySet()) {
ApplicationColumnPrefix.INFO.store(rowKey, applicationTable,
- StringKeyConverter.getInstance().encode(entry.getKey()), null,
+ stringKeyConverter.encode(entry.getKey()), null,
entry.getValue());
}
}
@@ -355,7 +369,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
if (info != null) {
for (Map.Entry<String, Object> entry : info.entrySet()) {
EntityColumnPrefix.INFO.store(rowKey, entityTable,
- StringKeyConverter.getInstance().encode(entry.getKey()), null,
+ stringKeyConverter.encode(entry.getKey()), null,
entry.getValue());
}
}
@@ -371,8 +385,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
return;
}
for (Map.Entry<String, String> entry : config.entrySet()) {
- byte[] configKey =
- StringKeyConverter.getInstance().encode(entry.getKey());
+ byte[] configKey = stringKeyConverter.encode(entry.getKey());
if (isApplication) {
ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable,
configKey, null, entry.getValue());
@@ -392,7 +405,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
if (metrics != null) {
for (TimelineMetric metric : metrics) {
byte[] metricColumnQualifier =
- StringKeyConverter.getInstance().encode(metric.getId());
+ stringKeyConverter.encode(metric.getId());
Map<Long, Number> timeseries = metric.getValues();
for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
Long timestamp = timeseriesEntry.getKey();
@@ -425,12 +438,11 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
"! Using the current timestamp");
eventTimestamp = System.currentTimeMillis();
}
- EventColumnNameConverter converter =
- EventColumnNameConverter.getInstance();
Map<String, Object> eventInfo = event.getInfo();
if ((eventInfo == null) || (eventInfo.size() == 0)) {
- byte[] columnQualifierBytes = converter.encode(
- new EventColumnName(eventId, eventTimestamp, null));
+ byte[] columnQualifierBytes =
+ new EventColumnName(eventId, eventTimestamp, null)
+ .getColumnQualifier();
if (isApplication) {
ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
columnQualifierBytes, null, Separator.EMPTY_BYTES);
@@ -441,9 +453,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
} else {
for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
// eventId=infoKey
- byte[] columnQualifierBytes = converter.encode(
- new EventColumnName(eventId, eventTimestamp,
- info.getKey()));
+ byte[] columnQualifierBytes =
+ new EventColumnName(eventId, eventTimestamp, info.getKey())
+ .getColumnQualifier();
if (isApplication) {
ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
columnQualifierBytes, null, info.getValue());
@@ -459,12 +471,56 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
}
}
+ /**
+ * Checks if the input TimelineEntity object is an ApplicationEntity.
+ *
+ * @param te TimelineEntity object.
+ * @return true if input is an ApplicationEntity, false otherwise
+ */
+ static boolean isApplicationEntity(TimelineEntity te) {
+ return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString());
+ }
+
+ /**
+ * @param te TimelineEntity object.
+ * @param eventId event with this id needs to be fetched
+ * @return TimelineEvent if TimelineEntity contains the desired event.
+ */
+ private static TimelineEvent getApplicationEvent(TimelineEntity te,
+ String eventId) {
+ if (isApplicationEntity(te)) {
+ for (TimelineEvent event : te.getEvents()) {
+ if (event.getId().equals(eventId)) {
+ return event;
+ }
+ }
+ }
+ return null;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage
+ * .TimelineWriter#aggregate
+ * (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity,
+ * org.apache
+ * .hadoop.yarn.server.timelineservice.storage.TimelineAggregationTrack)
+ */
@Override
public TimelineWriteResponse aggregate(TimelineEntity data,
TimelineAggregationTrack track) throws IOException {
return null;
}
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter#flush
+ * ()
+ */
@Override
public void flush() throws IOException {
// flush all buffered mutators
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
index 80fcf8c..dde3911 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
@@ -45,7 +45,7 @@ public enum ApplicationColumn implements Column<ApplicationTable> {
* When the application was created.
*/
CREATED_TIME(ApplicationColumnFamily.INFO, "created_time",
- LongConverter.getInstance()),
+ new LongConverter()),
/**
* The version of the flow that this app belongs to.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
index 0febc67..42488f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
@@ -67,8 +67,7 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
/**
* Metrics are stored with the metric name as the column name.
*/
- METRIC(ApplicationColumnFamily.METRICS, null,
- LongConverter.getInstance());
+ METRIC(ApplicationColumnFamily.METRICS, null, new LongConverter());
private final ColumnHelper<ApplicationTable> column;
private final ColumnFamily<ApplicationTable> columnFamily;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
index e476b21..da62fdf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
@@ -18,6 +18,12 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.application;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
/**
* Represents a rowkey for the application table.
*/
@@ -27,6 +33,8 @@ public class ApplicationRowKey {
private final String flowName;
private final Long flowRunId;
private final String appId;
+ private final KeyConverter<ApplicationRowKey> appRowKeyConverter =
+ new ApplicationRowKeyConverter();
public ApplicationRowKey(String clusterId, String userId, String flowName,
Long flowRunId, String appId) {
@@ -58,60 +66,141 @@ public class ApplicationRowKey {
}
/**
- * Constructs a row key prefix for the application table as follows:
- * {@code clusterId!userName!flowName!}.
- *
- * @param clusterId Cluster Id.
- * @param userId User Id.
- * @param flowName Flow Name.
- * @return byte array with the row key prefix
- */
- public static byte[] getRowKeyPrefix(String clusterId, String userId,
- String flowName) {
- return ApplicationRowKeyConverter.getInstance().encode(
- new ApplicationRowKey(clusterId, userId, flowName, null, null));
- }
-
- /**
- * Constructs a row key prefix for the application table as follows:
- * {@code clusterId!userName!flowName!flowRunId!}.
- *
- * @param clusterId Cluster Id.
- * @param userId User Id.
- * @param flowName Flow Name.
- * @param flowRunId Run Id for the flow.
- * @return byte array with the row key prefix
- */
- public static byte[] getRowKeyPrefix(String clusterId, String userId,
- String flowName, Long flowRunId) {
- return ApplicationRowKeyConverter.getInstance().encode(
- new ApplicationRowKey(clusterId, userId, flowName, flowRunId, null));
- }
-
- /**
* Constructs a row key for the application table as follows:
* {@code clusterId!userName!flowName!flowRunId!AppId}.
*
- * @param clusterId Cluster Id.
- * @param userId User Id.
- * @param flowName Flow Name.
- * @param flowRunId Run Id for the flow.
- * @param appId App Id.
* @return byte array with the row key
*/
- public static byte[] getRowKey(String clusterId, String userId,
- String flowName, Long flowRunId, String appId) {
- return ApplicationRowKeyConverter.getInstance().encode(
- new ApplicationRowKey(clusterId, userId, flowName, flowRunId, appId));
+ public byte[] getRowKey() {
+ return appRowKeyConverter.encode(this);
}
/**
* Given the raw row key as bytes, returns the row key as an object.
*
- * @param rowKey Byte representation of row key.
+ * @param rowKey Byte representation of row key.
* @return An <cite>ApplicationRowKey</cite> object.
*/
public static ApplicationRowKey parseRowKey(byte[] rowKey) {
- return ApplicationRowKeyConverter.getInstance().decode(rowKey);
+ return new ApplicationRowKeyConverter().decode(rowKey);
}
+
+ /**
+ * Encodes and decodes row key for application table. The row key is of the
+ * form: clusterId!userName!flowName!flowRunId!appId. flowRunId is a long,
+ * appId is encoded and decoded using {@link AppIdKeyConverter} and rest are
+ * strings.
+ * <p>
+ */
+ final private static class ApplicationRowKeyConverter implements
+ KeyConverter<ApplicationRowKey> {
+
+ private final KeyConverter<String> appIDKeyConverter =
+ new AppIdKeyConverter();
+
+ /**
+ * Intended for use in ApplicationRowKey only.
+ */
+ private ApplicationRowKeyConverter() {
+ }
+
+ /**
+ * Application row key is of the form
+ * clusterId!userName!flowName!flowRunId!appId with each segment separated
+ * by !. The sizes below indicate sizes of each one of these segements in
+ * sequence. clusterId, userName and flowName are strings. flowrunId is a
+ * long hence 8 bytes in size. app id is represented as 12 bytes with
+ * cluster timestamp part of appid takes 8 bytes(long) and seq id takes 4
+ * bytes(int). Strings are variable in size (i.e. end whenever separator is
+ * encountered). This is used while decoding and helps in determining where
+ * to split.
+ */
+ private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
+ Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+ AppIdKeyConverter.getKeySize() };
+
+ /*
+ * (non-Javadoc)
+ *
+ * Encodes ApplicationRowKey object into a byte array with each
+ * component/field in ApplicationRowKey separated by Separator#QUALIFIERS.
+ * This leads to an application table row key of the form
+ * clusterId!userName!flowName!flowRunId!appId If flowRunId in passed
+ * ApplicationRowKey object is null (and the fields preceding it i.e.
+ * clusterId, userId and flowName are not null), this returns a row key
+ * prefix of the form clusterId!userName!flowName! and if appId in
+ * ApplicationRowKey is null (other 4 components all are not null), this
+ * returns a row key prefix of the form
+ * clusterId!userName!flowName!flowRunId! flowRunId is inverted while
+ * encoding as it helps maintain a descending order for row keys in the
+ * application table.
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common
+ * .KeyConverter#encode(java.lang.Object)
+ */
+ @Override
+ public byte[] encode(ApplicationRowKey rowKey) {
+ byte[] cluster =
+ Separator.encode(rowKey.getClusterId(), Separator.SPACE,
+ Separator.TAB, Separator.QUALIFIERS);
+ byte[] user =
+ Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
+ Separator.QUALIFIERS);
+ byte[] flow =
+ Separator.encode(rowKey.getFlowName(), Separator.SPACE,
+ Separator.TAB, Separator.QUALIFIERS);
+ byte[] first = Separator.QUALIFIERS.join(cluster, user, flow);
+ // Note that flowRunId is a long, so we can't encode them all at the same
+ // time.
+ if (rowKey.getFlowRunId() == null) {
+ return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES);
+ }
+ byte[] second =
+ Bytes.toBytes(LongConverter.invertLong(
+ rowKey.getFlowRunId()));
+ if (rowKey.getAppId() == null || rowKey.getAppId().isEmpty()) {
+ return Separator.QUALIFIERS.join(first, second, Separator.EMPTY_BYTES);
+ }
+ byte[] third = appIDKeyConverter.encode(rowKey.getAppId());
+ return Separator.QUALIFIERS.join(first, second, third);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * Decodes an application row key of the form
+ * clusterId!userName!flowName!flowRunId!appId represented in byte format
+ * and converts it into an ApplicationRowKey object.flowRunId is inverted
+ * while decoding as it was inverted while encoding.
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common
+ * .KeyConverter#decode(byte[])
+ */
+ @Override
+ public ApplicationRowKey decode(byte[] rowKey) {
+ byte[][] rowKeyComponents =
+ Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
+ if (rowKeyComponents.length != 5) {
+ throw new IllegalArgumentException("the row key is not valid for "
+ + "an application");
+ }
+ String clusterId =
+ Separator.decode(Bytes.toString(rowKeyComponents[0]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ String userId =
+ Separator.decode(Bytes.toString(rowKeyComponents[1]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ String flowName =
+ Separator.decode(Bytes.toString(rowKeyComponents[2]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ Long flowRunId =
+ LongConverter.invertLong(Bytes.toLong(rowKeyComponents[3]));
+ String appId = appIDKeyConverter.decode(rowKeyComponents[4]);
+ return new ApplicationRowKey(clusterId, userId, flowName, flowRunId,
+ appId);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java
deleted file mode 100644
index 3b054a5..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage.application;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-
-/**
- * Encodes and decodes row key for application table.
- * The row key is of the form : clusterId!userName!flowName!flowRunId!appId.
- * flowRunId is a long, appId is encoded and decoded using
- * {@link AppIdKeyConverter} and rest are strings.
- */
-public final class ApplicationRowKeyConverter implements
- KeyConverter<ApplicationRowKey> {
- private static final ApplicationRowKeyConverter INSTANCE =
- new ApplicationRowKeyConverter();
-
- public static ApplicationRowKeyConverter getInstance() {
- return INSTANCE;
- }
-
- private ApplicationRowKeyConverter() {
- }
-
- // Application row key is of the form
- // clusterId!userName!flowName!flowRunId!appId with each segment separated
- // by !. The sizes below indicate sizes of each one of these segements in
- // sequence. clusterId, userName and flowName are strings. flowrunId is a long
- // hence 8 bytes in size. app id is represented as 12 bytes with cluster
- // timestamp part of appid being 8 bytes(long) and seq id being 4 bytes(int).
- // Strings are variable in size (i.e. end whenever separator is encountered).
- // This is used while decoding and helps in determining where to split.
- private static final int[] SEGMENT_SIZES = {
- Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
- Bytes.SIZEOF_LONG, AppIdKeyConverter.getKeySize() };
-
- /*
- * (non-Javadoc)
- *
- * Encodes ApplicationRowKey object into a byte array with each
- * component/field in ApplicationRowKey separated by Separator#QUALIFIERS.
- * This leads to an application table row key of the form
- * clusterId!userName!flowName!flowRunId!appId
- * If flowRunId in passed ApplicationRowKey object is null (and the fields
- * preceding it i.e. clusterId, userId and flowName are not null), this
- * returns a row key prefix of the form clusterId!userName!flowName! and if
- * appId in ApplicationRowKey is null (other 4 components are not null), this
- * returns a row key prefix of the form clusterId!userName!flowName!flowRunId!
- * flowRunId is inverted while encoding as it helps maintain a descending
- * order for row keys in application table.
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
- * #encode(java.lang.Object)
- */
- @Override
- public byte[] encode(ApplicationRowKey rowKey) {
- byte[] cluster = Separator.encode(rowKey.getClusterId(),
- Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
- byte[] user = Separator.encode(rowKey.getUserId(),
- Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
- byte[] flow = Separator.encode(rowKey.getFlowName(),
- Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
- byte[] first = Separator.QUALIFIERS.join(cluster, user, flow);
- // Note that flowRunId is a long, so we can't encode them all at the same
- // time.
- if (rowKey.getFlowRunId() == null) {
- return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES);
- }
- byte[] second = Bytes.toBytes(
- TimelineStorageUtils.invertLong(rowKey.getFlowRunId()));
- if (rowKey.getAppId() == null || rowKey.getAppId().isEmpty()) {
- return Separator.QUALIFIERS.join(first, second, Separator.EMPTY_BYTES);
- }
- byte[] third = AppIdKeyConverter.getInstance().encode(rowKey.getAppId());
- return Separator.QUALIFIERS.join(first, second, third);
- }
-
- /*
- * (non-Javadoc)
- *
- * Decodes an application row key of the form
- * clusterId!userName!flowName!flowRunId!appId represented in byte format and
- * converts it into an ApplicationRowKey object.flowRunId is inverted while
- * decoding as it was inverted while encoding.
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
- * #decode(byte[])
- */
- @Override
- public ApplicationRowKey decode(byte[] rowKey) {
- byte[][] rowKeyComponents =
- Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
- if (rowKeyComponents.length != 5) {
- throw new IllegalArgumentException("the row key is not valid for " +
- "an application");
- }
- String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]),
- Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
- String userId = Separator.decode(Bytes.toString(rowKeyComponents[1]),
- Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
- String flowName = Separator.decode(Bytes.toString(rowKeyComponents[2]),
- Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
- Long flowRunId =
- TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
- String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[4]);
- return new ApplicationRowKey(clusterId, userId, flowName, flowRunId, appId);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java
new file mode 100644
index 0000000..f61b0e9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.application;
+
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
+
+/**
+ * Represents a partial rowkey (without flowName or without flowName and
+ * flowRunId) for the application table.
+ */
+public class ApplicationRowKeyPrefix extends ApplicationRowKey implements
+ RowKeyPrefix<ApplicationRowKey> {
+
+ /**
+ * Creates a prefix which generates the following rowKeyPrefixes for the
+ * application table: {@code clusterId!userName!flowName!}.
+ *
+ * @param clusterId the cluster on which applications ran
+ * @param userId the user that ran applications
+ * @param flowName the name of the flow that was run by the user on the
+ * cluster
+ */
+ public ApplicationRowKeyPrefix(String clusterId, String userId,
+ String flowName) {
+ super(clusterId, userId, flowName, null, null);
+ }
+
+ /**
+ * Creates a prefix which generates the following rowKeyPrefixes for the
+ * application table: {@code clusterId!userName!flowName!flowRunId!}.
+ *
+ * @param clusterId identifying the cluster
+ * @param userId identifying the user
+ * @param flowName identifying the flow
+ * @param flowRunId identifying the instance of this flow
+ */
+ public ApplicationRowKeyPrefix(String clusterId, String userId,
+ String flowName, Long flowRunId) {
+ super(clusterId, userId, flowName, flowRunId, null);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.application.
+ * RowKeyPrefix#getRowKeyPrefix()
+ */
+ @Override
+ public byte[] getRowKeyPrefix() {
+ return super.getRowKey();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
index 6a38e32..8df4407 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
@@ -17,12 +17,19 @@
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
/**
* Represents a rowkey for the app_flow table.
*/
public class AppToFlowRowKey {
private final String clusterId;
private final String appId;
+ private final KeyConverter<AppToFlowRowKey> appToFlowRowKeyConverter =
+ new AppToFlowRowKeyConverter();
public AppToFlowRowKey(String clusterId, String appId) {
this.clusterId = clusterId;
@@ -41,13 +48,10 @@ public class AppToFlowRowKey {
* Constructs a row key prefix for the app_flow table as follows:
* {@code clusterId!AppId}.
*
- * @param clusterId Cluster Id.
- * @param appId Application Id.
* @return byte array with the row key
*/
- public static byte[] getRowKey(String clusterId, String appId) {
- return AppToFlowRowKeyConverter.getInstance().encode(
- new AppToFlowRowKey(clusterId, appId));
+ public byte[] getRowKey() {
+ return appToFlowRowKeyConverter.encode(this);
}
/**
@@ -57,6 +61,83 @@ public class AppToFlowRowKey {
* @return an <cite>AppToFlowRowKey</cite> object.
*/
public static AppToFlowRowKey parseRowKey(byte[] rowKey) {
- return AppToFlowRowKeyConverter.getInstance().decode(rowKey);
+ return new AppToFlowRowKeyConverter().decode(rowKey);
+ }
+
+ /**
+ * Encodes and decodes row key for app_flow table. The row key is of the form
+ * clusterId!appId. clusterId is a string and appId is encoded/decoded using
+ * {@link AppIdKeyConverter}.
+ * <p>
+ */
+ final private static class AppToFlowRowKeyConverter implements
+ KeyConverter<AppToFlowRowKey> {
+
+ private final KeyConverter<String> appIDKeyConverter =
+ new AppIdKeyConverter();
+
+ /**
+ * Intended for use in AppToFlowRowKey only.
+ */
+ private AppToFlowRowKeyConverter() {
+ }
+
+
+ /**
+ * App to flow row key is of the form clusterId!appId with the 2 segments
+ * separated by !. The sizes below indicate sizes of both of these segments
+ * in sequence. clusterId is a string. appId is represented as 12 bytes w.
+ * cluster Timestamp part of appid taking 8 bytes(long) and seq id taking 4
+ * bytes(int). Strings are variable in size (i.e. end whenever separator is
+ * encountered). This is used while decoding and helps in determining where
+ * to split.
+ */
+ private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
+ Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT };
+
+ /*
+ * (non-Javadoc)
+ *
+ * Encodes AppToFlowRowKey object into a byte array with each
+ * component/field in AppToFlowRowKey separated by Separator#QUALIFIERS.
+ * This leads to an app to flow table row key of the form clusterId!appId
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common
+ * .KeyConverter#encode(java.lang.Object)
+ */
+ @Override
+ public byte[] encode(AppToFlowRowKey rowKey) {
+ byte[] first =
+ Separator.encode(rowKey.getClusterId(), Separator.SPACE,
+ Separator.TAB, Separator.QUALIFIERS);
+ byte[] second = appIDKeyConverter.encode(rowKey.getAppId());
+ return Separator.QUALIFIERS.join(first, second);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * Decodes an app to flow row key of the form clusterId!appId represented
+ * in byte format and converts it into an AppToFlowRowKey object.
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common
+ * .KeyConverter#decode(byte[])
+ */
+ @Override
+ public AppToFlowRowKey decode(byte[] rowKey) {
+ byte[][] rowKeyComponents =
+ Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
+ if (rowKeyComponents.length != 2) {
+ throw new IllegalArgumentException("the row key is not valid for "
+ + "the app-to-flow table");
+ }
+ String clusterId =
+ Separator.decode(Bytes.toString(rowKeyComponents[0]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ String appId = appIDKeyConverter.decode(rowKeyComponents[1]);
+ return new AppToFlowRowKey(clusterId, appId);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java
deleted file mode 100644
index 0f0b879d..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-
-/**
- * Encodes and decodes row key for app_flow table.
- * The row key is of the form : clusterId!appId.
- * clusterId is a string and appId is encoded/decoded using
- * {@link AppIdKeyConverter}.
- */
-public final class AppToFlowRowKeyConverter
- implements KeyConverter<AppToFlowRowKey> {
- private static final AppToFlowRowKeyConverter INSTANCE =
- new AppToFlowRowKeyConverter();
-
- public static AppToFlowRowKeyConverter getInstance() {
- return INSTANCE;
- }
-
- private AppToFlowRowKeyConverter() {
- }
-
- // App to flow row key is of the form clusterId!appId with the 2 segments
- // separated by !. The sizes below indicate sizes of both of these segments
- // in sequence. clusterId is a string. appId is represented as 12 bytes with
- // cluster Timestamp part of appid being 8 bytes(long) and seq id being 4
- // bytes(int).
- // Strings are variable in size (i.e. end whenever separator is encountered).
- // This is used while decoding and helps in determining where to split.
- private static final int[] SEGMENT_SIZES = {
- Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT };
-
- /*
- * (non-Javadoc)
- *
- * Encodes AppToFlowRowKey object into a byte array with each component/field
- * in AppToFlowRowKey separated by Separator#QUALIFIERS. This leads to an
- * app to flow table row key of the form clusterId!appId
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
- * #encode(java.lang.Object)
- */
- @Override
- public byte[] encode(AppToFlowRowKey rowKey) {
- byte[] first = Separator.encode(rowKey.getClusterId(),
- Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
- byte[] second = AppIdKeyConverter.getInstance().encode(rowKey.getAppId());
- return Separator.QUALIFIERS.join(first, second);
- }
-
- /*
- * (non-Javadoc)
- *
- * Decodes an app to flow row key of the form clusterId!appId represented in
- * byte format and converts it into an AppToFlowRowKey object.
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
- * #decode(byte[])
- */
- @Override
- public AppToFlowRowKey decode(byte[] rowKey) {
- byte[][] rowKeyComponents =
- Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
- if (rowKeyComponents.length != 2) {
- throw new IllegalArgumentException("the row key is not valid for " +
- "the app-to-flow table");
- }
- String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]),
- Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
- String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[1]);
- return new AppToFlowRowKey(clusterId, appId);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
index a173b0f..f5f7aa6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
@@ -28,13 +28,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
* (long - 8 bytes) followed by sequence id section of app id (int - 4 bytes).
*/
public final class AppIdKeyConverter implements KeyConverter<String> {
- private static final AppIdKeyConverter INSTANCE = new AppIdKeyConverter();
- public static AppIdKeyConverter getInstance() {
- return INSTANCE;
- }
-
- private AppIdKeyConverter() {
+ public AppIdKeyConverter() {
}
/*
@@ -58,7 +53,7 @@ public final class AppIdKeyConverter implements KeyConverter<String> {
ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
byte[] appIdBytes = new byte[getKeySize()];
byte[] clusterTs = Bytes.toBytes(
- TimelineStorageUtils.invertLong(appId.getClusterTimestamp()));
+ LongConverter.invertLong(appId.getClusterTimestamp()));
System.arraycopy(clusterTs, 0, appIdBytes, 0, Bytes.SIZEOF_LONG);
byte[] seqId = Bytes.toBytes(TimelineStorageUtils.invertInt(appId.getId()));
System.arraycopy(seqId, 0, appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT);
@@ -83,7 +78,7 @@ public final class AppIdKeyConverter implements KeyConverter<String> {
if (appIdBytes.length != getKeySize()) {
throw new IllegalArgumentException("Invalid app id in byte format");
}
- long clusterTs = TimelineStorageUtils.invertLong(
+ long clusterTs = LongConverter.invertLong(
Bytes.toLong(appIdBytes, 0, Bytes.SIZEOF_LONG));
int seqId = TimelineStorageUtils.invertInt(
Bytes.toInt(appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java
index 6018f86..8445575 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java
@@ -26,6 +26,8 @@ public class EventColumnName {
private final String id;
private final Long timestamp;
private final String infoKey;
+ private final KeyConverter<EventColumnName> eventColumnNameConverter =
+ new EventColumnNameConverter();
public EventColumnName(String id, Long timestamp, String infoKey) {
this.id = id;
@@ -45,4 +47,17 @@ public class EventColumnName {
return infoKey;
}
+ /**
+ * @return a byte array with each components/fields separated by
+ * Separator#VALUES. This leads to an event column name of the form
+ * eventId=timestamp=infokey. If both timestamp and infokey are null,
+ * then a qualifier of the form eventId=timestamp= is returned. If
+ * only infokey is null, then a qualifier of the form eventId= is
+ * returned. These prefix forms are useful for queries that intend to
+ * retrieve more than one specific column name.
+ */
+ public byte[] getColumnQualifier() {
+ return eventColumnNameConverter.encode(this);
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org