You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2016/01/20 10:13:27 UTC
[10/50] [abbrv] hadoop git commit: YARN-4025. Deal with byte
representations of Longs in writer code. Contributed by Sangjin Lee and
Vrushali C.
YARN-4025. Deal with byte representations of Longs in writer code. Contributed by Sangjin Lee and Vrushali C.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/233bfc96
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/233bfc96
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/233bfc96
Branch: refs/heads/feature-YARN-2928
Commit: 233bfc9690d1cd6cc9a872e65d9a7a9b17a3946b
Parents: acbf140
Author: Junping Du <ju...@apache.org>
Authored: Wed Aug 19 10:00:33 2015 -0700
Committer: Li Lu <gt...@apache.org>
Committed: Tue Jan 19 17:41:56 2016 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../storage/HBaseTimelineReaderImpl.java | 68 +++---
.../storage/HBaseTimelineWriterImpl.java | 20 +-
.../application/ApplicationColumnPrefix.java | 40 ++++
.../storage/application/ApplicationTable.java | 6 +-
.../storage/common/ColumnHelper.java | 99 ++++++++-
.../storage/common/Separator.java | 16 +-
.../storage/common/TimelineWriterUtils.java | 9 +-
.../storage/entity/EntityColumnPrefix.java | 40 ++++
.../storage/entity/EntityTable.java | 6 +-
.../storage/TestHBaseTimelineWriterImpl.java | 207 +++++++++++--------
11 files changed, 373 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/233bfc96/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a36c288..b72c4a6 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -94,6 +94,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-3045. Implement NM writing container lifecycle events to Timeline
Service v2. (Naganarasimha G R via junping_du)
+ YARN-4025. Deal with byte representations of Longs in writer code.
+ (Sangjin Lee and Vrushali C via junping_du)
+
IMPROVEMENTS
YARN-3276. Code cleanup for timeline service API records. (Junping Du via
http://git-wip-us.apache.org/repos/asf/hadoop/blob/233bfc96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
index 094f868..c514c20 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
@@ -19,12 +19,9 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
@@ -431,44 +428,51 @@ public class HBaseTimelineReaderImpl
Map<String, Object> columns = prefix.readResults(result);
if (isConfig) {
for (Map.Entry<String, Object> column : columns.entrySet()) {
- entity.addConfig(column.getKey(), column.getKey().toString());
+ entity.addConfig(column.getKey(), column.getValue().toString());
}
} else {
entity.addInfo(columns);
}
}
+ /**
+ * Read events from the entity table or the application table. The column name
+ * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
+ * if there is no info associated with the event.
+ *
+ * See {@link EntityTable} and {@link ApplicationTable} for a more detailed
+ * schema description.
+ */
private static void readEvents(TimelineEntity entity, Result result,
boolean isApplication) throws IOException {
Map<String, TimelineEvent> eventsMap = new HashMap<>();
- Map<String, Object> eventsResult = isApplication ?
- ApplicationColumnPrefix.EVENT.readResults(result) :
- EntityColumnPrefix.EVENT.readResults(result);
- for (Map.Entry<String,Object> eventResult : eventsResult.entrySet()) {
- Collection<String> tokens =
- Separator.VALUES.splitEncoded(eventResult.getKey());
- if (tokens.size() != 2 && tokens.size() != 3) {
- throw new IOException(
- "Invalid event column name: " + eventResult.getKey());
- }
- Iterator<String> idItr = tokens.iterator();
- String id = idItr.next();
- String tsStr = idItr.next();
- // TODO: timestamp is not correct via ser/des through UTF-8 string
- Long ts =
- TimelineWriterUtils.invert(Bytes.toLong(tsStr.getBytes(
- StandardCharsets.UTF_8)));
- String key = Separator.VALUES.joinEncoded(id, ts.toString());
- TimelineEvent event = eventsMap.get(key);
- if (event == null) {
- event = new TimelineEvent();
- event.setId(id);
- event.setTimestamp(ts);
- eventsMap.put(key, event);
- }
- if (tokens.size() == 3) {
- String infoKey = idItr.next();
- event.addInfo(infoKey, eventResult.getValue());
+ Map<?, Object> eventsResult = isApplication ?
+ ApplicationColumnPrefix.EVENT.
+ readResultsHavingCompoundColumnQualifiers(result) :
+ EntityColumnPrefix.EVENT.
+ readResultsHavingCompoundColumnQualifiers(result);
+ for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
+ byte[][] karr = (byte[][])eventResult.getKey();
+ // the column name is of the form "eventId=timestamp=infoKey"
+ if (karr.length == 3) {
+ String id = Bytes.toString(karr[0]);
+ long ts = TimelineWriterUtils.invert(Bytes.toLong(karr[1]));
+ String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
+ TimelineEvent event = eventsMap.get(key);
+ if (event == null) {
+ event = new TimelineEvent();
+ event.setId(id);
+ event.setTimestamp(ts);
+ eventsMap.put(key, event);
+ }
+ // handle empty info
+ String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
+ if (infoKey != null) {
+ event.addInfo(infoKey, eventResult.getValue());
+ }
+ } else {
+ LOG.warn("incorrectly formatted column name: it will be discarded");
+ continue;
}
}
Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/233bfc96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index 96192cc..772002d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -300,25 +300,27 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
byte[] compoundColumnQualifierBytes =
Separator.VALUES.join(columnQualifierWithTsBytes,
null);
- String compoundColumnQualifier =
- Bytes.toString(compoundColumnQualifierBytes);
- EntityColumnPrefix.EVENT.store(rowKey, entityTable,
- compoundColumnQualifier, null, TimelineWriterUtils.EMPTY_BYTES);
+ if (isApplication) {
+ ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
+ compoundColumnQualifierBytes, null,
+ TimelineWriterUtils.EMPTY_BYTES);
+ } else {
+ EntityColumnPrefix.EVENT.store(rowKey, entityTable,
+ compoundColumnQualifierBytes, null,
+ TimelineWriterUtils.EMPTY_BYTES);
+ }
} else {
for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
// eventId?infoKey
byte[] compoundColumnQualifierBytes =
Separator.VALUES.join(columnQualifierWithTsBytes,
Bytes.toBytes(info.getKey()));
- // convert back to string to avoid additional API on store.
- String compoundColumnQualifier =
- Bytes.toString(compoundColumnQualifierBytes);
if (isApplication) {
ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
- compoundColumnQualifier, null, info.getValue());
+ compoundColumnQualifierBytes, null, info.getValue());
} else {
EntityColumnPrefix.EVENT.store(rowKey, entityTable,
- compoundColumnQualifier, null, info.getValue());
+ compoundColumnQualifierBytes, null, info.getValue());
}
} // for info: eventInfo
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/233bfc96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
index cd9e845..ad1def6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
@@ -111,6 +111,31 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
* TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
*/
public void store(byte[] rowKey,
+ TypedBufferedMutator<ApplicationTable> tableMutator, byte[] qualifier,
+ Long timestamp, Object inputValue) throws IOException {
+
+ // Null check
+ if (qualifier == null) {
+ throw new IOException("Cannot store column with null qualifier in "
+ + tableMutator.getName().getNameAsString());
+ }
+
+ byte[] columnQualifier =
+ ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+
+ column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #store(byte[],
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+ * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
+ */
+ public void store(byte[] rowKey,
TypedBufferedMutator<ApplicationTable> tableMutator, String qualifier,
Long timestamp, Object inputValue) throws IOException {
@@ -150,6 +175,21 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
return column.readResults(result, columnPrefixBytes);
}
+ /**
+ * @param result from which to read columns
+ * @return the latest values of columns in the column family. The column
+ * qualifier is returned as a list of parts, each part a byte[]. This
+ * is to facilitate returning byte arrays of values that were not
+ * Strings. If they can be treated as Strings, you should use
+ * {@link #readResults(Result)} instead.
+ * @throws IOException
+ */
+ public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
+ throws IOException {
+ return column.readResultsHavingCompoundColumnQualifiers(result,
+ columnPrefixBytes);
+ }
+
/*
* (non-Javadoc)
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/233bfc96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java
index d2a2cb9..a997997 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java
@@ -57,12 +57,12 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBas
* | | infoValue | metricValue1 | |
* | | | @timestamp2 | |
* | | r!relatesToKey: | | |
- * | | id3?id4?id5 | | |
+ * | | id3=id4=id5 | | |
* | | | | |
* | | s!isRelatedToKey: | | |
- * | | id7?id9?id6 | | |
+ * | | id7=id9=id6 | | |
* | | | | |
- * | | e!eventId?timestamp?infoKey: | | |
+ * | | e!eventId=timestamp=infoKey: | | |
* | | eventInfoValue | | |
* | | | | |
* | | flowVersion: | | |
http://git-wip-us.apache.org/repos/asf/hadoop/blob/233bfc96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
index a902924..f1b7c58 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
@@ -24,6 +24,8 @@ import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.TreeMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -37,6 +39,7 @@ import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
* @param <T> refers to the table.
*/
public class ColumnHelper<T> {
+ private static final Log LOG = LogFactory.getLog(ColumnHelper.class);
private final ColumnFamily<T> columnFamily;
@@ -143,6 +146,7 @@ public class ColumnHelper<T> {
.entrySet()) {
String columnName = null;
if (columnPrefixBytes == null) {
+ LOG.info("null prefix was specified; returning all columns");
// Decode the spaces we encoded in the column name.
columnName = Separator.decode(entry.getKey(), Separator.SPACE);
} else {
@@ -181,32 +185,43 @@ public class ColumnHelper<T> {
/**
* @param result from which to read columns
* @param columnPrefixBytes optional prefix to limit columns. If null all
- * columns are returned.
- * @return the latest values of columns in the column family.
+ * columns are returned.
+ * @return the latest values of columns in the column family. This assumes
+ * that the column name parts are all Strings by default. If the
+ * column name parts should be treated natively and not be converted
+ * back and forth from Strings, you should use
+ * {@link #readResultsHavingCompoundColumnQualifiers(Result, byte[])}
+ * instead.
* @throws IOException
*/
- public Map<String, Object> readResults(Result result, byte[] columnPrefixBytes)
- throws IOException {
+ public Map<String, Object> readResults(Result result,
+ byte[] columnPrefixBytes) throws IOException {
Map<String, Object> results = new HashMap<String, Object>();
if (result != null) {
Map<byte[], byte[]> columns = result.getFamilyMap(columnFamilyBytes);
for (Entry<byte[], byte[]> entry : columns.entrySet()) {
- if (entry.getKey() != null && entry.getKey().length > 0) {
+ byte[] columnKey = entry.getKey();
+ if (columnKey != null && columnKey.length > 0) {
String columnName = null;
if (columnPrefixBytes == null) {
+ LOG.info("null prefix was specified; returning all columns");
// Decode the spaces we encoded in the column name.
- columnName = Separator.decode(entry.getKey(), Separator.SPACE);
+ columnName = Separator.decode(columnKey, Separator.SPACE);
} else {
// A non-null prefix means columns are actually of the form
// prefix!columnNameRemainder
byte[][] columnNameParts =
- Separator.QUALIFIERS.split(entry.getKey(), 2);
+ Separator.QUALIFIERS.split(columnKey, 2);
byte[] actualColumnPrefixBytes = columnNameParts[0];
if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
&& columnNameParts.length == 2) {
// This is the prefix that we want
+ // if the column name is a compound qualifier
+ // with non string datatypes, the following decode will not
+ // work correctly since it considers all components to be String
+ // invoke the readResultsHavingCompoundColumnQualifiers function
columnName = Separator.decode(columnNameParts[1]);
}
}
@@ -223,6 +238,56 @@ public class ColumnHelper<T> {
}
/**
+ * @param result from which to read columns
+ * @param columnPrefixBytes optional prefix to limit columns. If null all
+ * columns are returned.
+ * @return the latest values of columns in the column family. If the column
+ * prefix is null, the column qualifier is returned as Strings. For a
+ * non-null column prefix bytes, the column qualifier is returned as
+ * a list of parts, each part a byte[]. This is to facilitate
+ * returning byte arrays of values that were not Strings.
+ * @throws IOException
+ */
+ public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result,
+ byte[] columnPrefixBytes) throws IOException {
+ // handle the case where the column prefix is null
+ // it is the same as readResults() so simply delegate to that implementation
+ if (columnPrefixBytes == null) {
+ return readResults(result, null);
+ }
+
+ Map<byte[][], Object> results = new HashMap<byte[][], Object>();
+
+ if (result != null) {
+ Map<byte[], byte[]> columns = result.getFamilyMap(columnFamilyBytes);
+ for (Entry<byte[], byte[]> entry : columns.entrySet()) {
+ byte[] columnKey = entry.getKey();
+ if (columnKey != null && columnKey.length > 0) {
+ // A non-null prefix means columns are actually of the form
+ // prefix!columnNameRemainder
+ // with a compound column qualifier, we are presuming existence of a
+ // prefix
+ byte[][] columnNameParts = Separator.QUALIFIERS.split(columnKey, 2);
+ if (columnNameParts.length > 0) {
+ byte[] actualColumnPrefixBytes = columnNameParts[0];
+ if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
+ && columnNameParts.length == 2) {
+ // This is the prefix that we want
+ byte[][] columnQualifierParts =
+ Separator.VALUES.split(columnNameParts[1]);
+ Object value = GenericObjectMapper.read(entry.getValue());
+ // we return the columnQualifier in parts since we don't know
+ // which part is of which data type
+ results.put(columnQualifierParts, value);
+ }
+ }
+ }
+ } // for entry
+ }
+ return results;
+ }
+
+ /**
* @param columnPrefixBytes The byte representation for the column prefix.
* Should not contain {@link Separator#QUALIFIERS}.
* @param qualifier for the remainder of the column. Any
@@ -247,4 +312,24 @@ public class ColumnHelper<T> {
return columnQualifier;
}
+ /**
+ * @param columnPrefixBytes The byte representation for the column prefix.
+ * Should not contain {@link Separator#QUALIFIERS}.
+ * @param qualifier the byte representation for the remainder of the column.
+ * @return fully sanitized column qualifier that is a combination of prefix
+ * and qualifier. If prefix is null, the result is simply the encoded
+ * qualifier without any separator.
+ */
+ public static byte[] getColumnQualifier(byte[] columnPrefixBytes,
+ byte[] qualifier) {
+
+ if (columnPrefixBytes == null) {
+ return qualifier;
+ }
+
+ byte[] columnQualifier =
+ Separator.QUALIFIERS.join(columnPrefixBytes, qualifier);
+ return columnQualifier;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/233bfc96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
index 3319419..9f91af8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
@@ -37,7 +37,7 @@ public enum Separator {
/**
* separator in values, and/or compound key/column qualifier fields.
*/
- VALUES("?", "%1$"),
+ VALUES("=", "%1$"),
/**
* separator in values, often used to avoid having these in qualifiers and
@@ -299,12 +299,22 @@ public enum Separator {
* up to a maximum of count items. This will naturally produce copied byte
* arrays for each of the split segments.
* @param source to be split
- * @param limit on how many segments are supposed to be returned. Negative
- * value indicates no limit on number of segments.
+ * @param limit on how many segments are supposed to be returned. A
+ * non-positive value indicates no limit on number of segments.
* @return source split by this separator.
*/
public byte[][] split(byte[] source, int limit) {
return TimelineWriterUtils.split(source, this.bytes, limit);
}
+ /**
+ * Splits the source array into multiple array segments using this separator,
+ * as many times as splits are found. This will naturally produce copied byte
+ * arrays for each of the split segments.
+ * @param source to be split
+ * @return source split by this separator.
+ */
+ public byte[][] split(byte[] source) {
+ return TimelineWriterUtils.split(source, this.bytes);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/233bfc96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
index c957bf5..58bdedc7e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
@@ -33,6 +33,9 @@ public class TimelineWriterUtils {
/** empty bytes */
public static final byte[] EMPTY_BYTES = new byte[0];
+ /** indicator for no limits for splitting */
+ public static final int NO_LIMIT_SPLIT = -1;
+
/**
* Splits the source array into multiple array segments using the given
* separator, up to a maximum of count items. This will naturally produce
@@ -45,7 +48,7 @@ public class TimelineWriterUtils {
* @return byte[] array after splitting the source
*/
public static byte[][] split(byte[] source, byte[] separator) {
- return split(source, separator, -1);
+ return split(source, separator, NO_LIMIT_SPLIT);
}
/**
@@ -57,7 +60,7 @@ public class TimelineWriterUtils {
*
* @param source
* @param separator
- * @param limit a negative value indicates no limit on number of segments.
+ * @param limit a non-positive value indicates no limit on number of segments.
* @return byte[][] after splitting the input source
*/
public static byte[][] split(byte[] source, byte[] separator, int limit) {
@@ -81,7 +84,7 @@ public class TimelineWriterUtils {
* separator byte array.
*/
public static List<Range> splitRanges(byte[] source, byte[] separator) {
- return splitRanges(source, separator, -1);
+ return splitRanges(source, separator, NO_LIMIT_SPLIT);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/233bfc96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
index c8485c0..75ff742 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
@@ -131,6 +131,31 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #store(byte[],
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+ * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
+ */
+ public void store(byte[] rowKey,
+ TypedBufferedMutator<EntityTable> tableMutator, byte[] qualifier,
+ Long timestamp, Object inputValue) throws IOException {
+
+ // Null check
+ if (qualifier == null) {
+ throw new IOException("Cannot store column with null qualifier in "
+ + tableMutator.getName().getNameAsString());
+ }
+
+ byte[] columnQualifier =
+ ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+
+ column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
*/
public Object readResult(Result result, String qualifier) throws IOException {
@@ -150,6 +175,21 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
return column.readResults(result, columnPrefixBytes);
}
+ /**
+ * @param result from which to read columns
+ * @return the latest values of columns in the column family. The column
+ * qualifier is returned as a list of parts, each part a byte[]. This
+ * is to facilitate returning byte arrays of values that were not
+ * Strings. If they can be treated as Strings, you should use
+ * {@link #readResults(Result)} instead.
+ * @throws IOException
+ */
+ public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
+ throws IOException {
+ return column.readResultsHavingCompoundColumnQualifiers(result,
+ columnPrefixBytes);
+ }
+
/*
* (non-Javadoc)
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/233bfc96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
index 9a8bd8c..96773b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
@@ -58,12 +58,12 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBas
* | | infoValue | | |
* | | | | |
* | | r!relatesToKey: | | |
- * | | id3?id4?id5 | | |
+ * | | id3=id4=id5 | | |
* | | | | |
* | | s!isRelatedToKey | | |
- * | | id7?id9?id6 | | |
+ * | | id7=id9=id6 | | |
* | | | | |
- * | | e!eventId?timestamp?infoKey: | | |
+ * | | e!eventId=timestamp=infoKey: | | |
* | | eventInfoValue | | |
* | | | | |
* | | flowVersion: | | |
http://git-wip-us.apache.org/repos/asf/hadoop/blob/233bfc96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
index 95f88d1..2875e01 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
@@ -27,8 +27,8 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.NavigableMap;
+import java.util.NavigableSet;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
@@ -75,7 +76,7 @@ import org.junit.Test;
* even if other records exist in the table. Use a different cluster name if
* you add a new test.
*/
-public class TestHBaseTimelineWriterImpl {
+public class TestHBaseTimelineStorage {
private static HBaseTestingUtility util;
@@ -101,8 +102,8 @@ public class TestHBaseTimelineWriterImpl {
ApplicationEntity entity = new ApplicationEntity();
String id = "hello";
entity.setId(id);
- Long cTime = 1425016501000L;
- Long mTime = 1425026901000L;
+ long cTime = 1425016501000L;
+ long mTime = 1425026901000L;
entity.setCreatedTime(cTime);
entity.setModifiedTime(mTime);
@@ -197,19 +198,16 @@ public class TestHBaseTimelineWriterImpl {
Number val =
(Number) ApplicationColumn.CREATED_TIME.readResult(result);
- Long cTime1 = val.longValue();
+ long cTime1 = val.longValue();
assertEquals(cTime1, cTime);
val = (Number) ApplicationColumn.MODIFIED_TIME.readResult(result);
- Long mTime1 = val.longValue();
+ long mTime1 = val.longValue();
assertEquals(mTime1, mTime);
Map<String, Object> infoColumns =
ApplicationColumnPrefix.INFO.readResults(result);
- assertEquals(infoMap.size(), infoColumns.size());
- for (String infoItem : infoMap.keySet()) {
- assertEquals(infoMap.get(infoItem), infoColumns.get(infoItem));
- }
+ assertEquals(infoMap, infoColumns);
// Remember isRelatedTo is of type Map<String, Set<String>>
for (String isRelatedToKey : isRelatedTo.keySet()) {
@@ -245,27 +243,15 @@ public class TestHBaseTimelineWriterImpl {
// Configuration
Map<String, Object> configColumns =
ApplicationColumnPrefix.CONFIG.readResults(result);
- assertEquals(conf.size(), configColumns.size());
- for (String configItem : conf.keySet()) {
- assertEquals(conf.get(configItem), configColumns.get(configItem));
- }
+ assertEquals(conf, configColumns);
NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result);
NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
- // We got metrics back
- assertNotNull(metricMap);
- // Same number of metrics as we wrote
- assertEquals(metricValues.entrySet().size(), metricMap.entrySet().size());
-
- // Iterate over original metrics and confirm that they are present
- // here.
- for (Entry<Long, Number> metricEntry : metricValues.entrySet()) {
- assertEquals(metricEntry.getValue(),
- metricMap.get(metricEntry.getKey()));
- }
+ assertEquals(metricValues, metricMap);
+ // read the timeline entity using the reader this time
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, id,
entity.getType(), entity.getId(),
EnumSet.of(TimelineReader.Field.ALL));
@@ -274,6 +260,31 @@ public class TestHBaseTimelineWriterImpl {
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
assertNotNull(e1);
assertEquals(1, es1.size());
+
+ // verify attributes
+ assertEquals(id, e1.getId());
+ assertEquals(TimelineEntityType.YARN_APPLICATION.toString(),
+ e1.getType());
+ assertEquals(cTime, e1.getCreatedTime());
+ assertEquals(mTime, e1.getModifiedTime());
+ Map<String, Object> infoMap2 = e1.getInfo();
+ assertEquals(infoMap, infoMap2);
+
+ Map<String, Set<String>> isRelatedTo2 = e1.getIsRelatedToEntities();
+ assertEquals(isRelatedTo, isRelatedTo2);
+
+ Map<String, Set<String>> relatesTo2 = e1.getRelatesToEntities();
+ assertEquals(relatesTo, relatesTo2);
+
+ Map<String, String> conf2 = e1.getConfigs();
+ assertEquals(conf, conf2);
+
+ Set<TimelineMetric> metrics2 = e1.getMetrics();
+ assertEquals(metrics, metrics2);
+ for (TimelineMetric metric2 : metrics2) {
+ Map<Long, Number> metricValues2 = metric2.getValues();
+ assertEquals(metricValues, metricValues2);
+ }
} finally {
if (hbi != null) {
hbi.stop();
@@ -294,8 +305,8 @@ public class TestHBaseTimelineWriterImpl {
String type = "world";
entity.setId(id);
entity.setType(type);
- Long cTime = 1425016501000L;
- Long mTime = 1425026901000L;
+ long cTime = 1425016501000L;
+ long mTime = 1425026901000L;
entity.setCreatedTime(cTime);
entity.setModifiedTime(mTime);
@@ -396,20 +407,16 @@ public class TestHBaseTimelineWriterImpl {
assertEquals(type, type1);
Number val = (Number) EntityColumn.CREATED_TIME.readResult(result);
- Long cTime1 = val.longValue();
+ long cTime1 = val.longValue();
assertEquals(cTime1, cTime);
val = (Number) EntityColumn.MODIFIED_TIME.readResult(result);
- Long mTime1 = val.longValue();
+ long mTime1 = val.longValue();
assertEquals(mTime1, mTime);
Map<String, Object> infoColumns =
EntityColumnPrefix.INFO.readResults(result);
- assertEquals(infoMap.size(), infoColumns.size());
- for (String infoItem : infoMap.keySet()) {
- assertEquals(infoMap.get(infoItem),
- infoColumns.get(infoItem));
- }
+ assertEquals(infoMap, infoColumns);
// Remember isRelatedTo is of type Map<String, Set<String>>
for (String isRelatedToKey : isRelatedTo.keySet()) {
@@ -447,32 +454,19 @@ public class TestHBaseTimelineWriterImpl {
// Configuration
Map<String, Object> configColumns =
EntityColumnPrefix.CONFIG.readResults(result);
- assertEquals(conf.size(), configColumns.size());
- for (String configItem : conf.keySet()) {
- assertEquals(conf.get(configItem), configColumns.get(configItem));
- }
+ assertEquals(conf, configColumns);
NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
- // We got metrics back
- assertNotNull(metricMap);
- // Same number of metrics as we wrote
- assertEquals(metricValues.entrySet().size(), metricMap.entrySet()
- .size());
-
- // Iterate over original metrics and confirm that they are present
- // here.
- for (Entry<Long, Number> metricEntry : metricValues.entrySet()) {
- assertEquals(metricEntry.getValue(),
- metricMap.get(metricEntry.getKey()));
- }
+ assertEquals(metricValues, metricMap);
}
}
assertEquals(1, rowCount);
assertEquals(17, colCount);
+ // read the timeline entity using the reader this time
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
entity.getType(), entity.getId(),
EnumSet.of(TimelineReader.Field.ALL));
@@ -481,6 +475,30 @@ public class TestHBaseTimelineWriterImpl {
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
assertNotNull(e1);
assertEquals(1, es1.size());
+
+ // verify attributes
+ assertEquals(id, e1.getId());
+ assertEquals(type, e1.getType());
+ assertEquals(cTime, e1.getCreatedTime());
+ assertEquals(mTime, e1.getModifiedTime());
+ Map<String, Object> infoMap2 = e1.getInfo();
+ assertEquals(infoMap, infoMap2);
+
+ Map<String, Set<String>> isRelatedTo2 = e1.getIsRelatedToEntities();
+ assertEquals(isRelatedTo, isRelatedTo2);
+
+ Map<String, Set<String>> relatesTo2 = e1.getRelatesToEntities();
+ assertEquals(relatesTo, relatesTo2);
+
+ Map<String, String> conf2 = e1.getConfigs();
+ assertEquals(conf, conf2);
+
+ Set<TimelineMetric> metrics2 = e1.getMetrics();
+ assertEquals(metrics, metrics2);
+ for (TimelineMetric metric2 : metrics2) {
+ Map<Long, Number> metricValues2 = metric2.getValues();
+ assertEquals(metricValues, metricValues2);
+ }
} finally {
if (hbi != null) {
hbi.stop();
@@ -494,9 +512,9 @@ public class TestHBaseTimelineWriterImpl {
}
private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
- String flow, Long runid, String appName, TimelineEntity te) {
+ String flow, long runid, String appName, TimelineEntity te) {
- byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1);
+ byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
assertTrue(rowKeyComponents.length == 7);
assertEquals(user, Bytes.toString(rowKeyComponents[0]));
@@ -511,9 +529,9 @@ public class TestHBaseTimelineWriterImpl {
}
private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster,
- String user, String flow, Long runid, String appName) {
+ String user, String flow, long runid, String appName) {
- byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1);
+ byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
assertTrue(rowKeyComponents.length == 5);
assertEquals(cluster, Bytes.toString(rowKeyComponents[0]));
@@ -530,7 +548,7 @@ public class TestHBaseTimelineWriterImpl {
TimelineEvent event = new TimelineEvent();
String eventId = ApplicationMetricsConstants.CREATED_EVENT_TYPE;
event.setId(eventId);
- Long expTs = 1436512802000L;
+ long expTs = 1436512802000L;
event.setTimestamp(expTs);
String expKey = "foo_event";
Object expVal = "test";
@@ -577,24 +595,25 @@ public class TestHBaseTimelineWriterImpl {
assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid,
appName));
- Map<String, Object> eventsResult =
- ApplicationColumnPrefix.EVENT.readResults(result);
+ Map<?, Object> eventsResult =
+ ApplicationColumnPrefix.EVENT.
+ readResultsHavingCompoundColumnQualifiers(result);
// there should be only one event
assertEquals(1, eventsResult.size());
- // key name for the event
- byte[] compoundColumnQualifierBytes =
- Separator.VALUES.join(Bytes.toBytes(eventId),
- Bytes.toBytes(TimelineWriterUtils.invert(expTs)),
- Bytes.toBytes(expKey));
- String valueKey = Bytes.toString(compoundColumnQualifierBytes);
- for (Map.Entry<String, Object> e : eventsResult.entrySet()) {
- // the value key must match
- assertEquals(valueKey, e.getKey());
+ for (Map.Entry<?, Object> e : eventsResult.entrySet()) {
+ // the qualifier is a compound key
+ // hence match individual values
+ byte[][] karr = (byte[][])e.getKey();
+ assertEquals(3, karr.length);
+ assertEquals(eventId, Bytes.toString(karr[0]));
+ assertEquals(TimelineWriterUtils.invert(expTs), Bytes.toLong(karr[1]));
+ assertEquals(expKey, Bytes.toString(karr[2]));
Object value = e.getValue();
// there should be only one timestamp and value
assertEquals(expVal, value.toString());
}
+ // read the timeline entity using the reader this time
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
entity.getType(), entity.getId(),
EnumSet.of(TimelineReader.Field.ALL));
@@ -613,6 +632,21 @@ public class TestHBaseTimelineWriterImpl {
assertEquals(1, es1.size());
assertEquals(1, es2.size());
assertEquals(es1, es2);
+
+ // check the events
+ NavigableSet<TimelineEvent> events = e1.getEvents();
+ // there should be only one event
+ assertEquals(1, events.size());
+ for (TimelineEvent e : events) {
+ assertEquals(eventId, e.getId());
+ assertEquals(expTs, e.getTimestamp());
+ Map<String,Object> info = e.getInfo();
+ assertEquals(1, info.size());
+ for (Map.Entry<String, Object> infoEntry : info.entrySet()) {
+ assertEquals(expKey, infoEntry.getKey());
+ assertEquals(expVal, infoEntry.getValue());
+ }
+ }
} finally {
if (hbi != null) {
hbi.stop();
@@ -630,7 +664,7 @@ public class TestHBaseTimelineWriterImpl {
TimelineEvent event = new TimelineEvent();
String eventId = "foo_event_id";
event.setId(eventId);
- Long expTs = 1436512802000L;
+ long expTs = 1436512802000L;
event.setTimestamp(expTs);
final TimelineEntity entity = new TimelineEntity();
@@ -678,22 +712,21 @@ public class TestHBaseTimelineWriterImpl {
assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName,
entity));
- Map<String, Object> eventsResult =
- EntityColumnPrefix.EVENT.readResults(result);
+ Map<?, Object> eventsResult =
+ EntityColumnPrefix.EVENT.
+ readResultsHavingCompoundColumnQualifiers(result);
// there should be only one event
assertEquals(1, eventsResult.size());
- // key name for the event
- byte[] compoundColumnQualifierWithTsBytes =
- Separator.VALUES.join(Bytes.toBytes(eventId),
- Bytes.toBytes(TimelineWriterUtils.invert(expTs)));
- byte[] compoundColumnQualifierBytes =
- Separator.VALUES.join(compoundColumnQualifierWithTsBytes,
- null);
- String valueKey = Bytes.toString(compoundColumnQualifierBytes);
- for (Map.Entry<String, Object> e :
- eventsResult.entrySet()) {
- // the column qualifier key must match
- assertEquals(valueKey, e.getKey());
+ for (Map.Entry<?, Object> e : eventsResult.entrySet()) {
+ // the qualifier is a compound key
+ // hence match individual values
+ byte[][] karr = (byte[][])e.getKey();
+ assertEquals(3, karr.length);
+ assertEquals(eventId, Bytes.toString(karr[0]));
+ assertEquals(TimelineWriterUtils.invert(expTs),
+ Bytes.toLong(karr[1]));
+ // key must be empty
+ assertEquals(0, karr[2].length);
Object value = e.getValue();
// value should be empty
assertEquals("", value.toString());
@@ -702,6 +735,7 @@ public class TestHBaseTimelineWriterImpl {
}
assertEquals(1, rowCount);
+ // read the timeline entity using the reader this time
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
entity.getType(), entity.getId(),
EnumSet.of(TimelineReader.Field.ALL));
@@ -710,6 +744,17 @@ public class TestHBaseTimelineWriterImpl {
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
assertNotNull(e1);
assertEquals(1, es1.size());
+
+ // check the events
+ NavigableSet<TimelineEvent> events = e1.getEvents();
+ // there should be only one event
+ assertEquals(1, events.size());
+ for (TimelineEvent e : events) {
+ assertEquals(eventId, e.getId());
+ assertEquals(expTs, e.getTimestamp());
+ Map<String,Object> info = e.getInfo();
+ assertTrue(info == null || info.isEmpty());
+ }
} finally {
hbi.stop();
hbi.close();