You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2016/12/10 00:39:13 UTC
[1/2] hadoop git commit: YARN-5925. Extract hbase-backend-exclusive
utility methods from TimelineStorageUtil. Contributed by Haibo Chen.
Repository: hadoop
Updated Branches:
refs/heads/YARN-5355 6217b87f4 -> 8288030cb
refs/heads/YARN-5355-branch-2 385d8fae8 -> cf8e3a8f3
YARN-5925. Extract hbase-backend-exclusive utility methods from TimelineStorageUtil. Contributed by Haibo Chen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8288030c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8288030c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8288030c
Branch: refs/heads/YARN-5355
Commit: 8288030cb4aa3b5a9425cc0a3f6df03a3eae1dfb
Parents: 6217b87
Author: Sangjin Lee <sj...@apache.org>
Authored: Fri Dec 9 16:30:49 2016 -0800
Committer: Sangjin Lee <sj...@apache.org>
Committed: Fri Dec 9 16:30:49 2016 -0800
----------------------------------------------------------------------
...stTimelineReaderWebServicesHBaseStorage.java | 6 +-
.../flow/TestHBaseStorageFlowActivity.java | 12 +-
.../flow/TestHBaseStorageFlowRunCompaction.java | 44 +--
.../storage/HBaseTimelineReaderImpl.java | 4 +-
.../storage/HBaseTimelineWriterImpl.java | 4 +-
.../storage/TimelineSchemaCreator.java | 4 +-
.../storage/common/AppIdKeyConverter.java | 5 +-
.../common/HBaseTimelineStorageUtils.java | 306 +++++++++++++++++++
.../storage/common/LongConverter.java | 2 +-
.../storage/common/TimelineStorageUtils.java | 265 ----------------
.../storage/flow/FlowActivityColumnPrefix.java | 10 +-
.../storage/flow/FlowActivityRowKey.java | 4 +-
.../storage/flow/FlowRunColumn.java | 6 +-
.../storage/flow/FlowRunColumnPrefix.java | 6 +-
.../storage/flow/FlowRunCoprocessor.java | 4 +-
.../storage/flow/FlowScanner.java | 13 +-
.../storage/reader/EntityTypeReader.java | 6 +-
.../storage/common/TestRowKeys.java | 2 +-
18 files changed, 374 insertions(+), 329 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8288030c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
index 2ed5d96..db1c1cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
@@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.junit.After;
import org.junit.AfterClass;
@@ -78,7 +78,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
private static HBaseTestingUtility util;
private static long ts = System.currentTimeMillis();
private static long dayTs =
- TimelineStorageUtils.getTopOfTheDayTimestamp(ts);
+ HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts);
@BeforeClass
public static void setup() throws Exception {
@@ -984,7 +984,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
assertEquals(1, entities.size());
long firstFlowActivity =
- TimelineStorageUtils.getTopOfTheDayTimestamp(1425016501000L);
+ HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(1425016501000L);
DateFormat fmt = TimelineReaderWebServices.DATE_FORMAT.get();
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8288030c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
index 1db0649..2db01a6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
@@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriter
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -173,7 +173,7 @@ public class TestHBaseStorageFlowActivity {
assertEquals(cluster, flowActivityRowKey.getClusterId());
assertEquals(user, flowActivityRowKey.getUserId());
assertEquals(flow, flowActivityRowKey.getFlowName());
- Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs);
+ Long dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs);
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
assertEquals(1, values.size());
checkFlowActivityRunId(runid, flowVersion, values);
@@ -305,7 +305,8 @@ public class TestHBaseStorageFlowActivity {
assertEquals(cluster, flowActivityRowKey.getClusterId());
assertEquals(user, flowActivityRowKey.getUserId());
assertEquals(flow, flowActivityRowKey.getFlowName());
- Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
+ Long dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(
+ appCreatedTime);
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
assertEquals(1, values.size());
checkFlowActivityRunId(runid, flowVersion, values);
@@ -390,7 +391,7 @@ public class TestHBaseStorageFlowActivity {
assertEquals(user, flowActivity.getUser());
assertEquals(flow, flowActivity.getFlowName());
long dayTs =
- TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
+ HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
assertEquals(dayTs, flowActivity.getDate().getTime());
Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
assertEquals(3, flowRuns.size());
@@ -445,7 +446,8 @@ public class TestHBaseStorageFlowActivity {
assertEquals(cluster, flowActivityRowKey.getClusterId());
assertEquals(user, flowActivityRowKey.getUserId());
assertEquals(flow, flowActivityRowKey.getFlowName());
- Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
+ Long dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(
+ appCreatedTime);
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
Map<byte[], byte[]> values = result
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8288030c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
index 5fe8b1b..644e31a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
@@ -54,8 +54,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriter
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -416,8 +416,8 @@ public class TestHBaseStorageFlowRunCompaction {
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
- Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
- cell1Ts, Bytes.toBytes(cellValue1), tagByteArray);
+ Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+ aQualifier, cell1Ts, Bytes.toBytes(cellValue1), tagByteArray);
currentColumnCells.add(c1);
tags = new ArrayList<>();
@@ -426,8 +426,8 @@ public class TestHBaseStorageFlowRunCompaction {
tags.add(t);
tagByteArray = Tag.fromList(tags);
// create a cell with a recent timestamp and attribute SUM_FINAL
- Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
- cell2Ts, Bytes.toBytes(cellValue2), tagByteArray);
+ Cell c2 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+ aQualifier, cell2Ts, Bytes.toBytes(cellValue2), tagByteArray);
currentColumnCells.add(c2);
tags = new ArrayList<>();
@@ -436,8 +436,8 @@ public class TestHBaseStorageFlowRunCompaction {
tags.add(t);
tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp but has attribute SUM
- Cell c3 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
- cell3Ts, Bytes.toBytes(cellValue3), tagByteArray);
+ Cell c3 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+ aQualifier, cell3Ts, Bytes.toBytes(cellValue3), tagByteArray);
currentColumnCells.add(c3);
tags = new ArrayList<>();
@@ -446,8 +446,8 @@ public class TestHBaseStorageFlowRunCompaction {
tags.add(t);
tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp but has attribute SUM
- Cell c4 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
- cell4Ts, Bytes.toBytes(cellValue4), tagByteArray);
+ Cell c4 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+ aQualifier, cell4Ts, Bytes.toBytes(cellValue4), tagByteArray);
currentColumnCells.add(c4);
List<Cell> cells =
@@ -516,7 +516,7 @@ public class TestHBaseStorageFlowRunCompaction {
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
- c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
currentColumnCells.add(c1);
cellTsFinal++;
@@ -530,7 +530,7 @@ public class TestHBaseStorageFlowRunCompaction {
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with attribute SUM
- c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
currentColumnCells.add(c1);
cellTsNotFinal++;
@@ -607,7 +607,7 @@ public class TestHBaseStorageFlowRunCompaction {
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
- c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
currentColumnCells.add(c1);
cellTsFinal++;
@@ -621,7 +621,7 @@ public class TestHBaseStorageFlowRunCompaction {
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
- c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray);
currentColumnCells.add(c1);
cellTsFinalNotExpire++;
@@ -635,7 +635,7 @@ public class TestHBaseStorageFlowRunCompaction {
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with attribute SUM
- c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
currentColumnCells.add(c1);
cellTsNotFinal++;
@@ -692,8 +692,8 @@ public class TestHBaseStorageFlowRunCompaction {
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
- Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
- 120L, Bytes.toBytes(cellValue1), tagByteArray);
+ Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+ aQualifier, 120L, Bytes.toBytes(cellValue1), tagByteArray);
currentColumnCells.add(c1);
tags = new ArrayList<>();
@@ -703,8 +703,8 @@ public class TestHBaseStorageFlowRunCompaction {
tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp but has attribute SUM
- Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
- 130L, Bytes.toBytes(cellValue2), tagByteArray);
+ Cell c2 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+ aQualifier, 130L, Bytes.toBytes(cellValue2), tagByteArray);
currentColumnCells.add(c2);
List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
new LongConverter(), currentTimestamp);
@@ -750,8 +750,8 @@ public class TestHBaseStorageFlowRunCompaction {
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
// create a cell with a VERY old timestamp
- Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
- 120L, Bytes.toBytes(1110L), tagByteArray);
+ Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+ aQualifier, 120L, Bytes.toBytes(1110L), tagByteArray);
currentColumnCells.add(c1);
List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
@@ -788,8 +788,8 @@ public class TestHBaseStorageFlowRunCompaction {
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
- Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
- currentTimestamp, Bytes.toBytes(1110L), tagByteArray);
+ Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+ aQualifier, currentTimestamp, Bytes.toBytes(1110L), tagByteArray);
currentColumnCells.add(c1);
List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
new LongConverter(), currentTimestamp);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8288030c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
index 233fc70..ce20113 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.reader.EntityTypeReader;
import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader;
import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory;
@@ -55,7 +55,7 @@ public class HBaseTimelineReaderImpl
@Override
public void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
- hbaseConf = TimelineStorageUtils.getTimelineServiceHBaseConf(conf);
+ hbaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
conn = ConnectionFactory.createConnection(hbaseConf);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8288030c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index df1e460..a59845a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -45,11 +45,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlow
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
@@ -113,7 +113,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
Configuration hbaseConf =
- TimelineStorageUtils.getTimelineServiceHBaseConf(conf);
+ HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
conn = ConnectionFactory.createConnection(hbaseConf);
entityTable = new EntityTable().getTableMutator(hbaseConf, conn);
appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8288030c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
index 176562b..fa0d479 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
@@ -69,7 +69,7 @@ public final class TimelineSchemaCreator {
public static void main(String[] args) throws Exception {
Configuration hbaseConf =
- TimelineStorageUtils.getTimelineServiceHBaseConf(null);
+ HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(null);
// Grab input args and allow for -Dxyz style arguments
String[] otherArgs = new GenericOptionsParser(hbaseConf, args)
.getRemainingArgs();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8288030c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
index 4cb46e6..c165801 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
@@ -54,7 +54,8 @@ public final class AppIdKeyConverter implements KeyConverter<String> {
byte[] clusterTs = Bytes.toBytes(
LongConverter.invertLong(appId.getClusterTimestamp()));
System.arraycopy(clusterTs, 0, appIdBytes, 0, Bytes.SIZEOF_LONG);
- byte[] seqId = Bytes.toBytes(TimelineStorageUtils.invertInt(appId.getId()));
+ byte[] seqId = Bytes.toBytes(
+ HBaseTimelineStorageUtils.invertInt(appId.getId()));
System.arraycopy(seqId, 0, appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT);
return appIdBytes;
}
@@ -79,7 +80,7 @@ public final class AppIdKeyConverter implements KeyConverter<String> {
}
long clusterTs = LongConverter.invertLong(
Bytes.toLong(appIdBytes, 0, Bytes.SIZEOF_LONG));
- int seqId = TimelineStorageUtils.invertInt(
+ int seqId = HBaseTimelineStorageUtils.invertInt(
Bytes.toInt(appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT));
return ApplicationId.newInstance(clusterTs, seqId).toString();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8288030c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
new file mode 100644
index 0000000..afe4d6a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A bunch of utility functions used in HBase TimelineService backend.
+ */
+public final class HBaseTimelineStorageUtils {
+
+ /** milliseconds in one day. */
+ public static final long MILLIS_ONE_DAY = 86400000L;
+
+ private HBaseTimelineStorageUtils() {
+ }
+
+ /**
+ * Combines the input array of attributes and the input aggregation operation
+ * into a new array of attributes.
+ *
+ * @param attributes Attributes to be combined.
+ * @param aggOp Aggregation operation.
+ * @return array of combined attributes.
+ */
+ public static Attribute[] combineAttributes(Attribute[] attributes,
+ AggregationOperation aggOp) {
+ int newLength = getNewLengthCombinedAttributes(attributes, aggOp);
+ Attribute[] combinedAttributes = new Attribute[newLength];
+
+ if (attributes != null) {
+ System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length);
+ }
+
+ if (aggOp != null) {
+ Attribute a2 = aggOp.getAttribute();
+ combinedAttributes[newLength - 1] = a2;
+ }
+ return combinedAttributes;
+ }
+
+ /**
+ * Returns a number for the new array size. The new array is the combination
+ * of input array of attributes and the input aggregation operation.
+ *
+ * @param attributes Attributes.
+ * @param aggOp Aggregation operation.
+ * @return the size for the new array
+ */
+ private static int getNewLengthCombinedAttributes(Attribute[] attributes,
+ AggregationOperation aggOp) {
+ int oldLength = getAttributesLength(attributes);
+ int aggLength = getAppOpLength(aggOp);
+ return oldLength + aggLength;
+ }
+
+ private static int getAppOpLength(AggregationOperation aggOp) {
+ if (aggOp != null) {
+ return 1;
+ }
+ return 0;
+ }
+
+ private static int getAttributesLength(Attribute[] attributes) {
+ if (attributes != null) {
+ return attributes.length;
+ }
+ return 0;
+ }
+
+ /**
+ * Returns the first seen aggregation operation as seen in the list of input
+ * tags or null otherwise.
+ *
+ * @param tags list of HBase tags.
+ * @return AggregationOperation
+ */
+ public static AggregationOperation getAggregationOperationFromTagsList(
+ List<Tag> tags) {
+ for (AggregationOperation aggOp : AggregationOperation.values()) {
+ for (Tag tag : tags) {
+ if (tag.getType() == aggOp.getTagType()) {
+ return aggOp;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Creates a {@link Tag} from the input attribute.
+ *
+ * @param attribute Attribute from which tag has to be fetched.
+ * @return a HBase Tag.
+ */
+ public static Tag getTagFromAttribute(Map.Entry<String, byte[]> attribute) {
+ // attribute could be either an Aggregation Operation or
+ // an Aggregation Dimension
+ // Get the Tag type from either
+ AggregationOperation aggOp = AggregationOperation
+ .getAggregationOperation(attribute.getKey());
+ if (aggOp != null) {
+ Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
+ return t;
+ }
+
+ AggregationCompactionDimension aggCompactDim =
+ AggregationCompactionDimension.getAggregationCompactionDimension(
+ attribute.getKey());
+ if (aggCompactDim != null) {
+ Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
+ return t;
+ }
+ return null;
+ }
+
+ /**
+ * creates a new cell based on the input cell but with the new value.
+ *
+ * @param origCell Original cell
+ * @param newValue new cell value
+ * @return cell
+ * @throws IOException while creating new cell.
+ */
+ public static Cell createNewCell(Cell origCell, byte[] newValue)
+ throws IOException {
+ return CellUtil.createCell(CellUtil.cloneRow(origCell),
+ CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
+ origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
+ }
+
+ /**
+ * creates a cell with the given inputs.
+ *
+ * @param row row of the cell to be created
+ * @param family column family name of the new cell
+ * @param qualifier qualifier for the new cell
+ * @param ts timestamp of the new cell
+ * @param newValue value of the new cell
+ * @param tags tags in the new cell
+ * @return cell
+ * @throws IOException while creating the cell.
+ */
+ public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier,
+ long ts, byte[] newValue, byte[] tags) throws IOException {
+ return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put,
+ newValue, tags);
+ }
+
+ /**
+ * returns app id from the list of tags.
+ *
+ * @param tags cell tags to be looked into
+ * @return App Id as the AggregationCompactionDimension
+ */
+ public static String getAggregationCompactionDimension(List<Tag> tags) {
+ String appId = null;
+ for (Tag t : tags) {
+ if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
+ .getType()) {
+ appId = Bytes.toString(t.getValue());
+ return appId;
+ }
+ }
+ return appId;
+ }
+
+ /**
+ * Converts an int into it's inverse int to be used in (row) keys
+ * where we want to have the largest int value in the top of the table
+ * (scans start at the largest int first).
+ *
+ * @param key value to be inverted so that the latest version will be first in
+ * a scan.
+ * @return inverted int
+ */
+ public static int invertInt(int key) {
+ return Integer.MAX_VALUE - key;
+ }
+
+ /**
+ * returns the timestamp of that day's start (which is midnight 00:00:00 AM)
+ * for a given input timestamp.
+ *
+ * @param ts Timestamp.
+ * @return timestamp of that day's beginning (midnight)
+ */
+ public static long getTopOfTheDayTimestamp(long ts) {
+ long dayTimestamp = ts - (ts % MILLIS_ONE_DAY);
+ return dayTimestamp;
+ }
+
+ /**
+ * @param conf Yarn configuration. Used to see if there is an explicit config
+ * pointing to the HBase config file to read. If null then a new
+ * HBase configuration will be returned.
+ * @return a configuration with the HBase configuration from the classpath,
+ * optionally overwritten by the timeline service configuration URL if
+ * specified.
+ * @throws MalformedURLException if a timeline service HBase configuration URL
+ * is specified but is a malformed URL.
+ */
+ public static Configuration getTimelineServiceHBaseConf(Configuration conf)
+ throws MalformedURLException {
+ Configuration hbaseConf;
+
+ if (conf == null) {
+ return HBaseConfiguration.create();
+ }
+
+ String timelineServiceHBaseConfFileURL =
+ conf.get(YarnConfiguration.TIMELINE_SERVICE_HBASE_CONFIGURATION_FILE);
+ if (timelineServiceHBaseConfFileURL != null
+ && timelineServiceHBaseConfFileURL.length() > 0) {
+ // create a clone so that we don't mess with out input one
+ hbaseConf = new Configuration(conf);
+ Configuration plainHBaseConf = new Configuration(false);
+ URL hbaseSiteXML = new URL(timelineServiceHBaseConfFileURL);
+ plainHBaseConf.addResource(hbaseSiteXML);
+ HBaseConfiguration.merge(hbaseConf, plainHBaseConf);
+ } else {
+ // default to what is on the classpath
+ hbaseConf = HBaseConfiguration.create(conf);
+ }
+ return hbaseConf;
+ }
+
+ /**
+ * Given a row key prefix stored in a byte array, return a byte array for its
+ * immediate next row key.
+ *
+ * @param rowKeyPrefix The provided row key prefix, represented in an array.
+ * @return the closest next row key of the provided row key.
+ */
+ public static byte[] calculateTheClosestNextRowKeyForPrefix(
+ byte[] rowKeyPrefix) {
+ // Essentially we are treating it like an 'unsigned very very long' and
+ // doing +1 manually.
+ // Search for the place where the trailing 0xFFs start
+ int offset = rowKeyPrefix.length;
+ while (offset > 0) {
+ if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
+ break;
+ }
+ offset--;
+ }
+
+ if (offset == 0) {
+ // We got an 0xFFFF... (only FFs) stopRow value which is
+ // the last possible prefix before the end of the table.
+ // So set it to stop at the 'end of the table'
+ return HConstants.EMPTY_END_ROW;
+ }
+
+ // Copy the right length of the original
+ byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
+ // And increment the last one
+ newStopRow[newStopRow.length - 1]++;
+ return newStopRow;
+ }
+
+ /**
+ * Checks if passed object is of integral type(Short/Integer/Long).
+ *
+ * @param obj Object to be checked.
+ * @return true if object passed is of type Short or Integer or Long, false
+ * otherwise.
+ */
+ public static boolean isIntegralValue(Object obj) {
+ return (obj instanceof Short) || (obj instanceof Integer) ||
+ (obj instanceof Long);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8288030c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java
index 600601a..6ab69f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java
@@ -40,7 +40,7 @@ public final class LongConverter implements NumericValueConverter,
@Override
public byte[] encodeValue(Object value) throws IOException {
- if (!TimelineStorageUtils.isIntegralValue(value)) {
+ if (!HBaseTimelineStorageUtils.isIntegralValue(value)) {
throw new IOException("Expected integral value");
}
return Bytes.toBytes(((Number)value).longValue());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8288030c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
index 4b5fa07..9b83659 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
@@ -18,32 +18,18 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
@@ -52,9 +38,6 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* A bunch of utility functions used across TimelineReader and TimelineWriter.
@@ -67,133 +50,6 @@ public final class TimelineStorageUtils {
private static final Log LOG = LogFactory.getLog(TimelineStorageUtils.class);
- /** milliseconds in one day. */
- public static final long MILLIS_ONE_DAY = 86400000L;
-
- /**
- * Converts an int into it's inverse int to be used in (row) keys
- * where we want to have the largest int value in the top of the table
- * (scans start at the largest int first).
- *
- * @param key value to be inverted so that the latest version will be first in
- * a scan.
- * @return inverted int
- */
- public static int invertInt(int key) {
- return Integer.MAX_VALUE - key;
- }
-
- /**
- * returns the timestamp of that day's start (which is midnight 00:00:00 AM)
- * for a given input timestamp.
- *
- * @param ts Timestamp.
- * @return timestamp of that day's beginning (midnight)
- */
- public static long getTopOfTheDayTimestamp(long ts) {
- long dayTimestamp = ts - (ts % MILLIS_ONE_DAY);
- return dayTimestamp;
- }
-
- /**
- * Combines the input array of attributes and the input aggregation operation
- * into a new array of attributes.
- *
- * @param attributes Attributes to be combined.
- * @param aggOp Aggregation operation.
- * @return array of combined attributes.
- */
- public static Attribute[] combineAttributes(Attribute[] attributes,
- AggregationOperation aggOp) {
- int newLength = getNewLengthCombinedAttributes(attributes, aggOp);
- Attribute[] combinedAttributes = new Attribute[newLength];
-
- if (attributes != null) {
- System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length);
- }
-
- if (aggOp != null) {
- Attribute a2 = aggOp.getAttribute();
- combinedAttributes[newLength - 1] = a2;
- }
- return combinedAttributes;
- }
-
- /**
- * Returns a number for the new array size. The new array is the combination
- * of input array of attributes and the input aggregation operation.
- *
- * @param attributes Attributes.
- * @param aggOp Aggregation operation.
- * @return the size for the new array
- */
- private static int getNewLengthCombinedAttributes(Attribute[] attributes,
- AggregationOperation aggOp) {
- int oldLength = getAttributesLength(attributes);
- int aggLength = getAppOpLength(aggOp);
- return oldLength + aggLength;
- }
-
- private static int getAppOpLength(AggregationOperation aggOp) {
- if (aggOp != null) {
- return 1;
- }
- return 0;
- }
-
- private static int getAttributesLength(Attribute[] attributes) {
- if (attributes != null) {
- return attributes.length;
- }
- return 0;
- }
-
- /**
- * Returns the first seen aggregation operation as seen in the list of input
- * tags or null otherwise.
- *
- * @param tags list of HBase tags.
- * @return AggregationOperation
- */
- public static AggregationOperation getAggregationOperationFromTagsList(
- List<Tag> tags) {
- for (AggregationOperation aggOp : AggregationOperation.values()) {
- for (Tag tag : tags) {
- if (tag.getType() == aggOp.getTagType()) {
- return aggOp;
- }
- }
- }
- return null;
- }
-
- /**
- * Creates a {@link Tag} from the input attribute.
- *
- * @param attribute Attribute from which tag has to be fetched.
- * @return a HBase Tag.
- */
- public static Tag getTagFromAttribute(Entry<String, byte[]> attribute) {
- // attribute could be either an Aggregation Operation or
- // an Aggregation Dimension
- // Get the Tag type from either
- AggregationOperation aggOp = AggregationOperation
- .getAggregationOperation(attribute.getKey());
- if (aggOp != null) {
- Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
- return t;
- }
-
- AggregationCompactionDimension aggCompactDim =
- AggregationCompactionDimension.getAggregationCompactionDimension(
- attribute.getKey());
- if (aggCompactDim != null) {
- Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
- return t;
- }
- return null;
- }
-
/**
* Matches key-values filter. Used for relatesTo/isRelatedTo filters.
*
@@ -520,125 +376,4 @@ public final class TimelineStorageUtils {
(obj instanceof Long);
}
- /**
- * creates a new cell based on the input cell but with the new value.
- *
- * @param origCell Original cell
- * @param newValue new cell value
- * @return cell
- * @throws IOException while creating new cell.
- */
- public static Cell createNewCell(Cell origCell, byte[] newValue)
- throws IOException {
- return CellUtil.createCell(CellUtil.cloneRow(origCell),
- CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
- origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
- }
-
- /**
- * creates a cell with the given inputs.
- *
- * @param row row of the cell to be created
- * @param family column family name of the new cell
- * @param qualifier qualifier for the new cell
- * @param ts timestamp of the new cell
- * @param newValue value of the new cell
- * @param tags tags in the new cell
- * @return cell
- * @throws IOException while creating the cell.
- */
- public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier,
- long ts, byte[] newValue, byte[] tags) throws IOException {
- return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put,
- newValue, tags);
- }
-
- /**
- * returns app id from the list of tags.
- *
- * @param tags cell tags to be looked into
- * @return App Id as the AggregationCompactionDimension
- */
- public static String getAggregationCompactionDimension(List<Tag> tags) {
- String appId = null;
- for (Tag t : tags) {
- if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
- .getType()) {
- appId = Bytes.toString(t.getValue());
- return appId;
- }
- }
- return appId;
- }
-
-
- /**
- * @param conf Yarn configuration. Used to see if there is an explicit config
- * pointing to the HBase config file to read. If null then a new
- * HBase configuration will be returned.
- * @return a configuration with the HBase configuration from the classpath,
- * optionally overwritten by the timeline service configuration URL if
- * specified.
- * @throws MalformedURLException if a timeline service HBase configuration URL
- * is specified but is a malformed URL.
- */
- public static Configuration getTimelineServiceHBaseConf(Configuration conf)
- throws MalformedURLException {
- Configuration hbaseConf;
-
- if (conf == null) {
- return HBaseConfiguration.create();
- }
-
- String timelineServiceHBaseConfFileURL =
- conf.get(YarnConfiguration.TIMELINE_SERVICE_HBASE_CONFIGURATION_FILE);
- if (timelineServiceHBaseConfFileURL != null
- && timelineServiceHBaseConfFileURL.length() > 0) {
- // create a clone so that we don't mess with out input one
- hbaseConf = new Configuration(conf);
- Configuration plainHBaseConf = new Configuration(false);
- URL hbaseSiteXML = new URL(timelineServiceHBaseConfFileURL);
- plainHBaseConf.addResource(hbaseSiteXML);
- HBaseConfiguration.merge(hbaseConf, plainHBaseConf);
- } else {
- // default to what is on the classpath
- hbaseConf = HBaseConfiguration.create(conf);
- }
- return hbaseConf;
- }
-
- /**
- * Given a row key prefix stored in a byte array, return a byte array for its
- * immediate next row key.
- *
- * @param rowKeyPrefix The provided row key prefix, represented in an array.
- * @return the closest next row key of the provided row key.
- */
- public static byte[] calculateTheClosestNextRowKeyForPrefix(
- byte[] rowKeyPrefix) {
- // Essentially we are treating it like an 'unsigned very very long' and
- // doing +1 manually.
- // Search for the place where the trailing 0xFFs start
- int offset = rowKeyPrefix.length;
- while (offset > 0) {
- if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
- break;
- }
- offset--;
- }
-
- if (offset == 0) {
- // We got an 0xFFFF... (only FFs) stopRow value which is
- // the last possible prefix before the end of the table.
- // So set it to stop at the 'end of the table'
- return HConstants.EMPTY_END_ROW;
- }
-
- // Copy the right length of the original
- byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
- // And increment the last one
- newStopRow[newStopRow.length - 1]++;
- return newStopRow;
- }
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8288030c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
index 71c3d90..439e0c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
@@ -26,9 +26,9 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
@@ -144,8 +144,8 @@ public enum FlowActivityColumnPrefix
}
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
- Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
- attributes, this.aggOp);
+ Attribute[] combinedAttributes =
+ HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
combinedAttributes);
}
@@ -269,8 +269,8 @@ public enum FlowActivityColumnPrefix
}
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
- Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
- attributes, this.aggOp);
+ Attribute[] combinedAttributes =
+ HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, null, inputValue,
combinedAttributes);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8288030c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
index d10608a..bb77e36 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
@@ -18,10 +18,10 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
/**
* Represents a rowkey for the flow activity table.
@@ -59,7 +59,7 @@ public class FlowActivityRowKey {
String flowName, boolean convertDayTsToTopOfDay) {
this.clusterId = clusterId;
if (convertDayTsToTopOfDay && (timestamp != null)) {
- this.dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(timestamp);
+ this.dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(timestamp);
} else {
this.dayTs = timestamp;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8288030c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
index 2e7a9d8..90dd345 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
@@ -25,9 +25,9 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
@@ -113,8 +113,8 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
TypedBufferedMutator<FlowRunTable> tableMutator, Long timestamp,
Object inputValue, Attribute... attributes) throws IOException {
- Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
- attributes, aggOp);
+ Attribute[] combinedAttributes =
+ HBaseTimelineStorageUtils.combineAttributes(attributes, aggOp);
column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
inputValue, combinedAttributes);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8288030c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
index e74282a..278d18e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
@@ -26,10 +26,10 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
@@ -136,7 +136,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
Attribute[] combinedAttributes =
- TimelineStorageUtils.combineAttributes(attributes, this.aggOp);
+ HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
combinedAttributes);
}
@@ -163,7 +163,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
Attribute[] combinedAttributes =
- TimelineStorageUtils.combineAttributes(attributes, this.aggOp);
+ HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
combinedAttributes);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8288030c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
index 5c7b069..122d399 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
/**
@@ -107,7 +107,7 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
List<Tag> tags = new ArrayList<>();
if ((attributes != null) && (attributes.size() > 0)) {
for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) {
- Tag t = TimelineStorageUtils.getTagFromAttribute(attribute);
+ Tag t = HBaseTimelineStorageUtils.getTagFromAttribute(attribute);
tags.add(t);
}
byte[] tagByteArray = Tag.fromList(tags);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8288030c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index 6e67722..0e3c8ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -45,9 +45,9 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
@@ -249,7 +249,7 @@ class FlowScanner implements RegionScanner, Closeable {
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
// We assume that all the operations for a particular column are the same
- return TimelineStorageUtils.getAggregationOperationFromTagsList(tags);
+ return HBaseTimelineStorageUtils.getAggregationOperationFromTagsList(tags);
}
/**
@@ -323,7 +323,7 @@ class FlowScanner implements RegionScanner, Closeable {
// only if this app has not been seen yet, add to current column cells
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
- String aggDim = TimelineStorageUtils
+ String aggDim = HBaseTimelineStorageUtils
.getAggregationCompactionDimension(tags);
if (!alreadySeenAggDim.contains(aggDim)) {
// if this agg dimension has already been seen,
@@ -418,7 +418,8 @@ class FlowScanner implements RegionScanner, Closeable {
sum = converter.add(sum, currentValue);
}
byte[] sumBytes = converter.encodeValue(sum);
- Cell sumCell = TimelineStorageUtils.createNewCell(mostRecentCell, sumBytes);
+ Cell sumCell =
+ HBaseTimelineStorageUtils.createNewCell(mostRecentCell, sumBytes);
return sumCell;
}
@@ -460,7 +461,7 @@ class FlowScanner implements RegionScanner, Closeable {
// if this is the existing flow sum cell
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
- String appId = TimelineStorageUtils
+ String appId = HBaseTimelineStorageUtils
.getAggregationCompactionDimension(tags);
if (appId == FLOW_APP_ID) {
sum = converter.add(sum, currentValue);
@@ -502,7 +503,7 @@ class FlowScanner implements RegionScanner, Closeable {
Bytes.toBytes(FLOW_APP_ID));
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
- Cell sumCell = TimelineStorageUtils.createNewCell(
+ Cell sumCell = HBaseTimelineStorageUtils.createNewCell(
CellUtil.cloneRow(anyCell),
CellUtil.cloneFamily(anyCell),
CellUtil.cloneQualifier(anyCell),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8288030c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java
index b69cf76..fd85878 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java
@@ -31,8 +31,8 @@ import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
@@ -153,8 +153,8 @@ public final class EntityTypeReader extends AbstractTimelineStorageReader {
System.arraycopy(entityTypeEncoded, 0, currRowKey, currRowKeyPrefix.length,
entityTypeEncoded.length);
- return
- TimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix(currRowKey);
+ return HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix(
+ currRowKey);
}
private ResultScanner getResult(Configuration hbaseConf, Connection conn,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8288030c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
index c4c8dce..6c6d1b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
@@ -189,7 +189,7 @@ public class TestRowKeys {
@Test
public void testFlowActivityRowKey() {
Long ts = 1459900830000L;
- Long dayTimestamp = TimelineStorageUtils.getTopOfTheDayTimestamp(ts);
+ Long dayTimestamp = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts);
byte[] byteRowKey =
new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME).getRowKey();
FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(byteRowKey);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/2] hadoop git commit: YARN-5925. Extract hbase-backend-exclusive
utility methods from TimelineStorageUtil. Contributed by Haibo Chen.
Posted by sj...@apache.org.
YARN-5925. Extract hbase-backend-exclusive utility methods from TimelineStorageUtil. Contributed by Haibo Chen.
(cherry picked from commit 8288030cb4aa3b5a9425cc0a3f6df03a3eae1dfb)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cf8e3a8f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cf8e3a8f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cf8e3a8f
Branch: refs/heads/YARN-5355-branch-2
Commit: cf8e3a8f399d1cffd73dd1fbb001887340330f17
Parents: 385d8fa
Author: Sangjin Lee <sj...@apache.org>
Authored: Fri Dec 9 16:30:49 2016 -0800
Committer: Sangjin Lee <sj...@apache.org>
Committed: Fri Dec 9 16:31:28 2016 -0800
----------------------------------------------------------------------
...stTimelineReaderWebServicesHBaseStorage.java | 6 +-
.../flow/TestHBaseStorageFlowActivity.java | 12 +-
.../flow/TestHBaseStorageFlowRunCompaction.java | 44 +--
.../storage/HBaseTimelineReaderImpl.java | 4 +-
.../storage/HBaseTimelineWriterImpl.java | 4 +-
.../storage/TimelineSchemaCreator.java | 4 +-
.../storage/common/AppIdKeyConverter.java | 5 +-
.../common/HBaseTimelineStorageUtils.java | 306 +++++++++++++++++++
.../storage/common/LongConverter.java | 2 +-
.../storage/common/TimelineStorageUtils.java | 265 ----------------
.../storage/flow/FlowActivityColumnPrefix.java | 10 +-
.../storage/flow/FlowActivityRowKey.java | 4 +-
.../storage/flow/FlowRunColumn.java | 6 +-
.../storage/flow/FlowRunColumnPrefix.java | 6 +-
.../storage/flow/FlowRunCoprocessor.java | 4 +-
.../storage/flow/FlowScanner.java | 13 +-
.../storage/reader/EntityTypeReader.java | 6 +-
.../storage/common/TestRowKeys.java | 2 +-
18 files changed, 374 insertions(+), 329 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf8e3a8f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
index 6bbafe3..a83d2dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
@@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.junit.After;
import org.junit.AfterClass;
@@ -78,7 +78,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
private static HBaseTestingUtility util;
private static long ts = System.currentTimeMillis();
private static long dayTs =
- TimelineStorageUtils.getTopOfTheDayTimestamp(ts);
+ HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts);
@BeforeClass
public static void setup() throws Exception {
@@ -962,7 +962,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
assertEquals(1, entities.size());
long firstFlowActivity =
- TimelineStorageUtils.getTopOfTheDayTimestamp(1425016501000L);
+ HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(1425016501000L);
DateFormat fmt = TimelineReaderWebServices.DATE_FORMAT.get();
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf8e3a8f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
index 8f073dc..97d40fd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
@@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriter
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -173,7 +173,7 @@ public class TestHBaseStorageFlowActivity {
assertEquals(cluster, flowActivityRowKey.getClusterId());
assertEquals(user, flowActivityRowKey.getUserId());
assertEquals(flow, flowActivityRowKey.getFlowName());
- Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs);
+ Long dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs);
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
assertEquals(1, values.size());
checkFlowActivityRunId(runid, flowVersion, values);
@@ -305,7 +305,8 @@ public class TestHBaseStorageFlowActivity {
assertEquals(cluster, flowActivityRowKey.getClusterId());
assertEquals(user, flowActivityRowKey.getUserId());
assertEquals(flow, flowActivityRowKey.getFlowName());
- Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
+ Long dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(
+ appCreatedTime);
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
assertEquals(1, values.size());
checkFlowActivityRunId(runid, flowVersion, values);
@@ -390,7 +391,7 @@ public class TestHBaseStorageFlowActivity {
assertEquals(user, flowActivity.getUser());
assertEquals(flow, flowActivity.getFlowName());
long dayTs =
- TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
+ HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
assertEquals(dayTs, flowActivity.getDate().getTime());
Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
assertEquals(3, flowRuns.size());
@@ -445,7 +446,8 @@ public class TestHBaseStorageFlowActivity {
assertEquals(cluster, flowActivityRowKey.getClusterId());
assertEquals(user, flowActivityRowKey.getUserId());
assertEquals(flow, flowActivityRowKey.getFlowName());
- Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
+ Long dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(
+ appCreatedTime);
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
Map<byte[], byte[]> values = result
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf8e3a8f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
index 965fc50..a4c0e44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
@@ -54,8 +54,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriter
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -416,8 +416,8 @@ public class TestHBaseStorageFlowRunCompaction {
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
- Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
- cell1Ts, Bytes.toBytes(cellValue1), tagByteArray);
+ Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+ aQualifier, cell1Ts, Bytes.toBytes(cellValue1), tagByteArray);
currentColumnCells.add(c1);
tags = new ArrayList<>();
@@ -426,8 +426,8 @@ public class TestHBaseStorageFlowRunCompaction {
tags.add(t);
tagByteArray = Tag.fromList(tags);
// create a cell with a recent timestamp and attribute SUM_FINAL
- Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
- cell2Ts, Bytes.toBytes(cellValue2), tagByteArray);
+ Cell c2 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+ aQualifier, cell2Ts, Bytes.toBytes(cellValue2), tagByteArray);
currentColumnCells.add(c2);
tags = new ArrayList<>();
@@ -436,8 +436,8 @@ public class TestHBaseStorageFlowRunCompaction {
tags.add(t);
tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp but has attribute SUM
- Cell c3 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
- cell3Ts, Bytes.toBytes(cellValue3), tagByteArray);
+ Cell c3 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+ aQualifier, cell3Ts, Bytes.toBytes(cellValue3), tagByteArray);
currentColumnCells.add(c3);
tags = new ArrayList<>();
@@ -446,8 +446,8 @@ public class TestHBaseStorageFlowRunCompaction {
tags.add(t);
tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp but has attribute SUM
- Cell c4 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
- cell4Ts, Bytes.toBytes(cellValue4), tagByteArray);
+ Cell c4 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+ aQualifier, cell4Ts, Bytes.toBytes(cellValue4), tagByteArray);
currentColumnCells.add(c4);
List<Cell> cells =
@@ -516,7 +516,7 @@ public class TestHBaseStorageFlowRunCompaction {
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
- c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
currentColumnCells.add(c1);
cellTsFinal++;
@@ -530,7 +530,7 @@ public class TestHBaseStorageFlowRunCompaction {
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with attribute SUM
- c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
currentColumnCells.add(c1);
cellTsNotFinal++;
@@ -607,7 +607,7 @@ public class TestHBaseStorageFlowRunCompaction {
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
- c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
currentColumnCells.add(c1);
cellTsFinal++;
@@ -621,7 +621,7 @@ public class TestHBaseStorageFlowRunCompaction {
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
- c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray);
currentColumnCells.add(c1);
cellTsFinalNotExpire++;
@@ -635,7 +635,7 @@ public class TestHBaseStorageFlowRunCompaction {
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with attribute SUM
- c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
currentColumnCells.add(c1);
cellTsNotFinal++;
@@ -692,8 +692,8 @@ public class TestHBaseStorageFlowRunCompaction {
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
- Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
- 120L, Bytes.toBytes(cellValue1), tagByteArray);
+ Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+ aQualifier, 120L, Bytes.toBytes(cellValue1), tagByteArray);
currentColumnCells.add(c1);
tags = new ArrayList<>();
@@ -703,8 +703,8 @@ public class TestHBaseStorageFlowRunCompaction {
tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp but has attribute SUM
- Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
- 130L, Bytes.toBytes(cellValue2), tagByteArray);
+ Cell c2 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+ aQualifier, 130L, Bytes.toBytes(cellValue2), tagByteArray);
currentColumnCells.add(c2);
List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
new LongConverter(), currentTimestamp);
@@ -750,8 +750,8 @@ public class TestHBaseStorageFlowRunCompaction {
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
// create a cell with a VERY old timestamp
- Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
- 120L, Bytes.toBytes(1110L), tagByteArray);
+ Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+ aQualifier, 120L, Bytes.toBytes(1110L), tagByteArray);
currentColumnCells.add(c1);
List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
@@ -788,8 +788,8 @@ public class TestHBaseStorageFlowRunCompaction {
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
- Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
- currentTimestamp, Bytes.toBytes(1110L), tagByteArray);
+ Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+ aQualifier, currentTimestamp, Bytes.toBytes(1110L), tagByteArray);
currentColumnCells.add(c1);
List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
new LongConverter(), currentTimestamp);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf8e3a8f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
index 233fc70..ce20113 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.reader.EntityTypeReader;
import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader;
import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory;
@@ -55,7 +55,7 @@ public class HBaseTimelineReaderImpl
@Override
public void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
- hbaseConf = TimelineStorageUtils.getTimelineServiceHBaseConf(conf);
+ hbaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
conn = ConnectionFactory.createConnection(hbaseConf);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf8e3a8f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index 1f41aaa..c1c2a5e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -45,11 +45,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlow
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
@@ -108,7 +108,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
Configuration hbaseConf =
- TimelineStorageUtils.getTimelineServiceHBaseConf(conf);
+ HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
conn = ConnectionFactory.createConnection(hbaseConf);
entityTable = new EntityTable().getTableMutator(hbaseConf, conn);
appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf8e3a8f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
index 176562b..fa0d479 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
@@ -69,7 +69,7 @@ public final class TimelineSchemaCreator {
public static void main(String[] args) throws Exception {
Configuration hbaseConf =
- TimelineStorageUtils.getTimelineServiceHBaseConf(null);
+ HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(null);
// Grab input args and allow for -Dxyz style arguments
String[] otherArgs = new GenericOptionsParser(hbaseConf, args)
.getRemainingArgs();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf8e3a8f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
index 4cb46e6..c165801 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
@@ -54,7 +54,8 @@ public final class AppIdKeyConverter implements KeyConverter<String> {
byte[] clusterTs = Bytes.toBytes(
LongConverter.invertLong(appId.getClusterTimestamp()));
System.arraycopy(clusterTs, 0, appIdBytes, 0, Bytes.SIZEOF_LONG);
- byte[] seqId = Bytes.toBytes(TimelineStorageUtils.invertInt(appId.getId()));
+ byte[] seqId = Bytes.toBytes(
+ HBaseTimelineStorageUtils.invertInt(appId.getId()));
System.arraycopy(seqId, 0, appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT);
return appIdBytes;
}
@@ -79,7 +80,7 @@ public final class AppIdKeyConverter implements KeyConverter<String> {
}
long clusterTs = LongConverter.invertLong(
Bytes.toLong(appIdBytes, 0, Bytes.SIZEOF_LONG));
- int seqId = TimelineStorageUtils.invertInt(
+ int seqId = HBaseTimelineStorageUtils.invertInt(
Bytes.toInt(appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT));
return ApplicationId.newInstance(clusterTs, seqId).toString();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf8e3a8f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
new file mode 100644
index 0000000..afe4d6a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A bunch of utility functions used in HBase TimelineService backend.
+ */
+public final class HBaseTimelineStorageUtils {
+
+ /** milliseconds in one day. */
+ public static final long MILLIS_ONE_DAY = 86400000L;
+
+ private HBaseTimelineStorageUtils() {
+ }
+
+ /**
+ * Combines the input array of attributes and the input aggregation operation
+ * into a new array of attributes.
+ *
+ * @param attributes Attributes to be combined.
+ * @param aggOp Aggregation operation.
+ * @return array of combined attributes.
+ */
+ public static Attribute[] combineAttributes(Attribute[] attributes,
+ AggregationOperation aggOp) {
+ int newLength = getNewLengthCombinedAttributes(attributes, aggOp);
+ Attribute[] combinedAttributes = new Attribute[newLength];
+
+ if (attributes != null) {
+ System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length);
+ }
+
+ if (aggOp != null) {
+ Attribute a2 = aggOp.getAttribute();
+ combinedAttributes[newLength - 1] = a2;
+ }
+ return combinedAttributes;
+ }
+
+ /**
+ * Returns a number for the new array size. The new array is the combination
+ * of input array of attributes and the input aggregation operation.
+ *
+ * @param attributes Attributes.
+ * @param aggOp Aggregation operation.
+ * @return the size for the new array
+ */
+ private static int getNewLengthCombinedAttributes(Attribute[] attributes,
+ AggregationOperation aggOp) {
+ int oldLength = getAttributesLength(attributes);
+ int aggLength = getAppOpLength(aggOp);
+ return oldLength + aggLength;
+ }
+
+ private static int getAppOpLength(AggregationOperation aggOp) {
+ if (aggOp != null) {
+ return 1;
+ }
+ return 0;
+ }
+
+ private static int getAttributesLength(Attribute[] attributes) {
+ if (attributes != null) {
+ return attributes.length;
+ }
+ return 0;
+ }
+
+ /**
+ * Returns the first seen aggregation operation as seen in the list of input
+ * tags or null otherwise.
+ *
+ * @param tags list of HBase tags.
+ * @return AggregationOperation
+ */
+ public static AggregationOperation getAggregationOperationFromTagsList(
+ List<Tag> tags) {
+ for (AggregationOperation aggOp : AggregationOperation.values()) {
+ for (Tag tag : tags) {
+ if (tag.getType() == aggOp.getTagType()) {
+ return aggOp;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Creates a {@link Tag} from the input attribute.
+ *
+ * @param attribute Attribute from which tag has to be fetched.
+ * @return a HBase Tag.
+ */
+ public static Tag getTagFromAttribute(Map.Entry<String, byte[]> attribute) {
+ // attribute could be either an Aggregation Operation or
+ // an Aggregation Dimension
+ // Get the Tag type from either
+ AggregationOperation aggOp = AggregationOperation
+ .getAggregationOperation(attribute.getKey());
+ if (aggOp != null) {
+ Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
+ return t;
+ }
+
+ AggregationCompactionDimension aggCompactDim =
+ AggregationCompactionDimension.getAggregationCompactionDimension(
+ attribute.getKey());
+ if (aggCompactDim != null) {
+ Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
+ return t;
+ }
+ return null;
+ }
+
+ /**
+ * creates a new cell based on the input cell but with the new value.
+ *
+ * @param origCell Original cell
+ * @param newValue new cell value
+ * @return cell
+ * @throws IOException while creating new cell.
+ */
+ public static Cell createNewCell(Cell origCell, byte[] newValue)
+ throws IOException {
+ return CellUtil.createCell(CellUtil.cloneRow(origCell),
+ CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
+ origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
+ }
+
+ /**
+ * creates a cell with the given inputs.
+ *
+ * @param row row of the cell to be created
+ * @param family column family name of the new cell
+ * @param qualifier qualifier for the new cell
+ * @param ts timestamp of the new cell
+ * @param newValue value of the new cell
+ * @param tags tags in the new cell
+ * @return cell
+ * @throws IOException while creating the cell.
+ */
+ public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier,
+ long ts, byte[] newValue, byte[] tags) throws IOException {
+ return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put,
+ newValue, tags);
+ }
+
+ /**
+ * returns app id from the list of tags.
+ *
+ * @param tags cell tags to be looked into
+ * @return App Id as the AggregationCompactionDimension
+ */
+ public static String getAggregationCompactionDimension(List<Tag> tags) {
+ String appId = null;
+ for (Tag t : tags) {
+ if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
+ .getType()) {
+ appId = Bytes.toString(t.getValue());
+ return appId;
+ }
+ }
+ return appId;
+ }
+
+ /**
+ * Converts an int into it's inverse int to be used in (row) keys
+ * where we want to have the largest int value in the top of the table
+ * (scans start at the largest int first).
+ *
+ * @param key value to be inverted so that the latest version will be first in
+ * a scan.
+ * @return inverted int
+ */
+ public static int invertInt(int key) {
+ return Integer.MAX_VALUE - key;
+ }
+
+ /**
+ * returns the timestamp of that day's start (which is midnight 00:00:00 AM)
+ * for a given input timestamp.
+ *
+ * @param ts Timestamp.
+ * @return timestamp of that day's beginning (midnight)
+ */
+ public static long getTopOfTheDayTimestamp(long ts) {
+ long dayTimestamp = ts - (ts % MILLIS_ONE_DAY);
+ return dayTimestamp;
+ }
+
+ /**
+ * @param conf Yarn configuration. Used to see if there is an explicit config
+ * pointing to the HBase config file to read. If null then a new
+ * HBase configuration will be returned.
+ * @return a configuration with the HBase configuration from the classpath,
+ * optionally overwritten by the timeline service configuration URL if
+ * specified.
+ * @throws MalformedURLException if a timeline service HBase configuration URL
+ * is specified but is a malformed URL.
+ */
+ public static Configuration getTimelineServiceHBaseConf(Configuration conf)
+ throws MalformedURLException {
+ Configuration hbaseConf;
+
+ if (conf == null) {
+ return HBaseConfiguration.create();
+ }
+
+ String timelineServiceHBaseConfFileURL =
+ conf.get(YarnConfiguration.TIMELINE_SERVICE_HBASE_CONFIGURATION_FILE);
+ if (timelineServiceHBaseConfFileURL != null
+ && timelineServiceHBaseConfFileURL.length() > 0) {
+ // create a clone so that we don't mess with out input one
+ hbaseConf = new Configuration(conf);
+ Configuration plainHBaseConf = new Configuration(false);
+ URL hbaseSiteXML = new URL(timelineServiceHBaseConfFileURL);
+ plainHBaseConf.addResource(hbaseSiteXML);
+ HBaseConfiguration.merge(hbaseConf, plainHBaseConf);
+ } else {
+ // default to what is on the classpath
+ hbaseConf = HBaseConfiguration.create(conf);
+ }
+ return hbaseConf;
+ }
+
+ /**
+ * Given a row key prefix stored in a byte array, return a byte array for its
+ * immediate next row key.
+ *
+ * @param rowKeyPrefix The provided row key prefix, represented in an array.
+ * @return the closest next row key of the provided row key.
+ */
+ public static byte[] calculateTheClosestNextRowKeyForPrefix(
+ byte[] rowKeyPrefix) {
+ // Essentially we are treating it like an 'unsigned very very long' and
+ // doing +1 manually.
+ // Search for the place where the trailing 0xFFs start
+ int offset = rowKeyPrefix.length;
+ while (offset > 0) {
+ if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
+ break;
+ }
+ offset--;
+ }
+
+ if (offset == 0) {
+ // We got an 0xFFFF... (only FFs) stopRow value which is
+ // the last possible prefix before the end of the table.
+ // So set it to stop at the 'end of the table'
+ return HConstants.EMPTY_END_ROW;
+ }
+
+ // Copy the right length of the original
+ byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
+ // And increment the last one
+ newStopRow[newStopRow.length - 1]++;
+ return newStopRow;
+ }
+
+ /**
+ * Checks if passed object is of integral type(Short/Integer/Long).
+ *
+ * @param obj Object to be checked.
+ * @return true if object passed is of type Short or Integer or Long, false
+ * otherwise.
+ */
+ public static boolean isIntegralValue(Object obj) {
+ return (obj instanceof Short) || (obj instanceof Integer) ||
+ (obj instanceof Long);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf8e3a8f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java
index 600601a..6ab69f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java
@@ -40,7 +40,7 @@ public final class LongConverter implements NumericValueConverter,
@Override
public byte[] encodeValue(Object value) throws IOException {
- if (!TimelineStorageUtils.isIntegralValue(value)) {
+ if (!HBaseTimelineStorageUtils.isIntegralValue(value)) {
throw new IOException("Expected integral value");
}
return Bytes.toBytes(((Number)value).longValue());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf8e3a8f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
index 4b5fa07..9b83659 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
@@ -18,32 +18,18 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
@@ -52,9 +38,6 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* A bunch of utility functions used across TimelineReader and TimelineWriter.
@@ -67,133 +50,6 @@ public final class TimelineStorageUtils {
private static final Log LOG = LogFactory.getLog(TimelineStorageUtils.class);
- /** milliseconds in one day. */
- public static final long MILLIS_ONE_DAY = 86400000L;
-
- /**
- * Converts an int into it's inverse int to be used in (row) keys
- * where we want to have the largest int value in the top of the table
- * (scans start at the largest int first).
- *
- * @param key value to be inverted so that the latest version will be first in
- * a scan.
- * @return inverted int
- */
- public static int invertInt(int key) {
- return Integer.MAX_VALUE - key;
- }
-
- /**
- * returns the timestamp of that day's start (which is midnight 00:00:00 AM)
- * for a given input timestamp.
- *
- * @param ts Timestamp.
- * @return timestamp of that day's beginning (midnight)
- */
- public static long getTopOfTheDayTimestamp(long ts) {
- long dayTimestamp = ts - (ts % MILLIS_ONE_DAY);
- return dayTimestamp;
- }
-
- /**
- * Combines the input array of attributes and the input aggregation operation
- * into a new array of attributes.
- *
- * @param attributes Attributes to be combined.
- * @param aggOp Aggregation operation.
- * @return array of combined attributes.
- */
- public static Attribute[] combineAttributes(Attribute[] attributes,
- AggregationOperation aggOp) {
- int newLength = getNewLengthCombinedAttributes(attributes, aggOp);
- Attribute[] combinedAttributes = new Attribute[newLength];
-
- if (attributes != null) {
- System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length);
- }
-
- if (aggOp != null) {
- Attribute a2 = aggOp.getAttribute();
- combinedAttributes[newLength - 1] = a2;
- }
- return combinedAttributes;
- }
-
- /**
- * Returns a number for the new array size. The new array is the combination
- * of input array of attributes and the input aggregation operation.
- *
- * @param attributes Attributes.
- * @param aggOp Aggregation operation.
- * @return the size for the new array
- */
- private static int getNewLengthCombinedAttributes(Attribute[] attributes,
- AggregationOperation aggOp) {
- int oldLength = getAttributesLength(attributes);
- int aggLength = getAppOpLength(aggOp);
- return oldLength + aggLength;
- }
-
- private static int getAppOpLength(AggregationOperation aggOp) {
- if (aggOp != null) {
- return 1;
- }
- return 0;
- }
-
- private static int getAttributesLength(Attribute[] attributes) {
- if (attributes != null) {
- return attributes.length;
- }
- return 0;
- }
-
- /**
- * Returns the first seen aggregation operation as seen in the list of input
- * tags or null otherwise.
- *
- * @param tags list of HBase tags.
- * @return AggregationOperation
- */
- public static AggregationOperation getAggregationOperationFromTagsList(
- List<Tag> tags) {
- for (AggregationOperation aggOp : AggregationOperation.values()) {
- for (Tag tag : tags) {
- if (tag.getType() == aggOp.getTagType()) {
- return aggOp;
- }
- }
- }
- return null;
- }
-
- /**
- * Creates a {@link Tag} from the input attribute.
- *
- * @param attribute Attribute from which tag has to be fetched.
- * @return a HBase Tag.
- */
- public static Tag getTagFromAttribute(Entry<String, byte[]> attribute) {
- // attribute could be either an Aggregation Operation or
- // an Aggregation Dimension
- // Get the Tag type from either
- AggregationOperation aggOp = AggregationOperation
- .getAggregationOperation(attribute.getKey());
- if (aggOp != null) {
- Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
- return t;
- }
-
- AggregationCompactionDimension aggCompactDim =
- AggregationCompactionDimension.getAggregationCompactionDimension(
- attribute.getKey());
- if (aggCompactDim != null) {
- Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
- return t;
- }
- return null;
- }
-
/**
* Matches key-values filter. Used for relatesTo/isRelatedTo filters.
*
@@ -520,125 +376,4 @@ public final class TimelineStorageUtils {
(obj instanceof Long);
}
- /**
- * creates a new cell based on the input cell but with the new value.
- *
- * @param origCell Original cell
- * @param newValue new cell value
- * @return cell
- * @throws IOException while creating new cell.
- */
- public static Cell createNewCell(Cell origCell, byte[] newValue)
- throws IOException {
- return CellUtil.createCell(CellUtil.cloneRow(origCell),
- CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
- origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
- }
-
- /**
- * creates a cell with the given inputs.
- *
- * @param row row of the cell to be created
- * @param family column family name of the new cell
- * @param qualifier qualifier for the new cell
- * @param ts timestamp of the new cell
- * @param newValue value of the new cell
- * @param tags tags in the new cell
- * @return cell
- * @throws IOException while creating the cell.
- */
- public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier,
- long ts, byte[] newValue, byte[] tags) throws IOException {
- return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put,
- newValue, tags);
- }
-
- /**
- * returns app id from the list of tags.
- *
- * @param tags cell tags to be looked into
- * @return App Id as the AggregationCompactionDimension
- */
- public static String getAggregationCompactionDimension(List<Tag> tags) {
- String appId = null;
- for (Tag t : tags) {
- if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
- .getType()) {
- appId = Bytes.toString(t.getValue());
- return appId;
- }
- }
- return appId;
- }
-
-
- /**
- * @param conf Yarn configuration. Used to see if there is an explicit config
- * pointing to the HBase config file to read. If null then a new
- * HBase configuration will be returned.
- * @return a configuration with the HBase configuration from the classpath,
- * optionally overwritten by the timeline service configuration URL if
- * specified.
- * @throws MalformedURLException if a timeline service HBase configuration URL
- * is specified but is a malformed URL.
- */
- public static Configuration getTimelineServiceHBaseConf(Configuration conf)
- throws MalformedURLException {
- Configuration hbaseConf;
-
- if (conf == null) {
- return HBaseConfiguration.create();
- }
-
- String timelineServiceHBaseConfFileURL =
- conf.get(YarnConfiguration.TIMELINE_SERVICE_HBASE_CONFIGURATION_FILE);
- if (timelineServiceHBaseConfFileURL != null
- && timelineServiceHBaseConfFileURL.length() > 0) {
- // create a clone so that we don't mess with out input one
- hbaseConf = new Configuration(conf);
- Configuration plainHBaseConf = new Configuration(false);
- URL hbaseSiteXML = new URL(timelineServiceHBaseConfFileURL);
- plainHBaseConf.addResource(hbaseSiteXML);
- HBaseConfiguration.merge(hbaseConf, plainHBaseConf);
- } else {
- // default to what is on the classpath
- hbaseConf = HBaseConfiguration.create(conf);
- }
- return hbaseConf;
- }
-
- /**
- * Given a row key prefix stored in a byte array, return a byte array for its
- * immediate next row key.
- *
- * @param rowKeyPrefix The provided row key prefix, represented in an array.
- * @return the closest next row key of the provided row key.
- */
- public static byte[] calculateTheClosestNextRowKeyForPrefix(
- byte[] rowKeyPrefix) {
- // Essentially we are treating it like an 'unsigned very very long' and
- // doing +1 manually.
- // Search for the place where the trailing 0xFFs start
- int offset = rowKeyPrefix.length;
- while (offset > 0) {
- if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
- break;
- }
- offset--;
- }
-
- if (offset == 0) {
- // We got an 0xFFFF... (only FFs) stopRow value which is
- // the last possible prefix before the end of the table.
- // So set it to stop at the 'end of the table'
- return HConstants.EMPTY_END_ROW;
- }
-
- // Copy the right length of the original
- byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
- // And increment the last one
- newStopRow[newStopRow.length - 1]++;
- return newStopRow;
- }
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf8e3a8f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
index 71c3d90..439e0c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
@@ -26,9 +26,9 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
@@ -144,8 +144,8 @@ public enum FlowActivityColumnPrefix
}
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
- Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
- attributes, this.aggOp);
+ Attribute[] combinedAttributes =
+ HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
combinedAttributes);
}
@@ -269,8 +269,8 @@ public enum FlowActivityColumnPrefix
}
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
- Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
- attributes, this.aggOp);
+ Attribute[] combinedAttributes =
+ HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, null, inputValue,
combinedAttributes);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf8e3a8f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
index d10608a..bb77e36 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
@@ -18,10 +18,10 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
/**
* Represents a rowkey for the flow activity table.
@@ -59,7 +59,7 @@ public class FlowActivityRowKey {
String flowName, boolean convertDayTsToTopOfDay) {
this.clusterId = clusterId;
if (convertDayTsToTopOfDay && (timestamp != null)) {
- this.dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(timestamp);
+ this.dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(timestamp);
} else {
this.dayTs = timestamp;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf8e3a8f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
index 2e7a9d8..90dd345 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
@@ -25,9 +25,9 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
@@ -113,8 +113,8 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
TypedBufferedMutator<FlowRunTable> tableMutator, Long timestamp,
Object inputValue, Attribute... attributes) throws IOException {
- Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
- attributes, aggOp);
+ Attribute[] combinedAttributes =
+ HBaseTimelineStorageUtils.combineAttributes(attributes, aggOp);
column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
inputValue, combinedAttributes);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf8e3a8f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
index e74282a..278d18e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
@@ -26,10 +26,10 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
@@ -136,7 +136,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
Attribute[] combinedAttributes =
- TimelineStorageUtils.combineAttributes(attributes, this.aggOp);
+ HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
combinedAttributes);
}
@@ -163,7 +163,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
Attribute[] combinedAttributes =
- TimelineStorageUtils.combineAttributes(attributes, this.aggOp);
+ HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
combinedAttributes);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf8e3a8f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
index 5c7b069..122d399 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
/**
@@ -107,7 +107,7 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
List<Tag> tags = new ArrayList<>();
if ((attributes != null) && (attributes.size() > 0)) {
for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) {
- Tag t = TimelineStorageUtils.getTagFromAttribute(attribute);
+ Tag t = HBaseTimelineStorageUtils.getTagFromAttribute(attribute);
tags.add(t);
}
byte[] tagByteArray = Tag.fromList(tags);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf8e3a8f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index 6e67722..0e3c8ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -45,9 +45,9 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
@@ -249,7 +249,7 @@ class FlowScanner implements RegionScanner, Closeable {
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
// We assume that all the operations for a particular column are the same
- return TimelineStorageUtils.getAggregationOperationFromTagsList(tags);
+ return HBaseTimelineStorageUtils.getAggregationOperationFromTagsList(tags);
}
/**
@@ -323,7 +323,7 @@ class FlowScanner implements RegionScanner, Closeable {
// only if this app has not been seen yet, add to current column cells
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
- String aggDim = TimelineStorageUtils
+ String aggDim = HBaseTimelineStorageUtils
.getAggregationCompactionDimension(tags);
if (!alreadySeenAggDim.contains(aggDim)) {
// if this agg dimension has already been seen,
@@ -418,7 +418,8 @@ class FlowScanner implements RegionScanner, Closeable {
sum = converter.add(sum, currentValue);
}
byte[] sumBytes = converter.encodeValue(sum);
- Cell sumCell = TimelineStorageUtils.createNewCell(mostRecentCell, sumBytes);
+ Cell sumCell =
+ HBaseTimelineStorageUtils.createNewCell(mostRecentCell, sumBytes);
return sumCell;
}
@@ -460,7 +461,7 @@ class FlowScanner implements RegionScanner, Closeable {
// if this is the existing flow sum cell
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
- String appId = TimelineStorageUtils
+ String appId = HBaseTimelineStorageUtils
.getAggregationCompactionDimension(tags);
if (appId == FLOW_APP_ID) {
sum = converter.add(sum, currentValue);
@@ -502,7 +503,7 @@ class FlowScanner implements RegionScanner, Closeable {
Bytes.toBytes(FLOW_APP_ID));
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
- Cell sumCell = TimelineStorageUtils.createNewCell(
+ Cell sumCell = HBaseTimelineStorageUtils.createNewCell(
CellUtil.cloneRow(anyCell),
CellUtil.cloneFamily(anyCell),
CellUtil.cloneQualifier(anyCell),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf8e3a8f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java
index b69cf76..fd85878 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java
@@ -31,8 +31,8 @@ import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
@@ -153,8 +153,8 @@ public final class EntityTypeReader extends AbstractTimelineStorageReader {
System.arraycopy(entityTypeEncoded, 0, currRowKey, currRowKeyPrefix.length,
entityTypeEncoded.length);
- return
- TimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix(currRowKey);
+ return HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix(
+ currRowKey);
}
private ResultScanner getResult(Configuration hbaseConf, Connection conn,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf8e3a8f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
index c4c8dce..6c6d1b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
@@ -189,7 +189,7 @@ public class TestRowKeys {
@Test
public void testFlowActivityRowKey() {
Long ts = 1459900830000L;
- Long dayTimestamp = TimelineStorageUtils.getTopOfTheDayTimestamp(ts);
+ Long dayTimestamp = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts);
byte[] byteRowKey =
new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME).getRowKey();
FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(byteRowKey);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org