You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by va...@apache.org on 2016/06/10 15:48:45 UTC
[3/4] hadoop git commit: YARN-5170. Eliminate singleton converters
and static method access. (Joep Rottinghuis via Varun Saxena)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java
index 32ef1c3..d3ef897 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java
@@ -30,14 +30,8 @@ import org.apache.hadoop.hbase.util.Bytes;
*/
public final class EventColumnNameConverter
implements KeyConverter<EventColumnName> {
- private static final EventColumnNameConverter INSTANCE =
- new EventColumnNameConverter();
- public static EventColumnNameConverter getInstance() {
- return INSTANCE;
- }
-
- private EventColumnNameConverter() {
+ public EventColumnNameConverter() {
}
// eventId=timestamp=infokey are of types String, Long String
@@ -69,7 +63,7 @@ public final class EventColumnNameConverter
return Separator.VALUES.join(first, Separator.EMPTY_BYTES);
}
byte[] second = Bytes.toBytes(
- TimelineStorageUtils.invertLong(key.getTimestamp()));
+ LongConverter.invertLong(key.getTimestamp()));
if (key.getInfoKey() == null) {
return Separator.VALUES.join(first, second, Separator.EMPTY_BYTES);
}
@@ -96,7 +90,7 @@ public final class EventColumnNameConverter
}
String id = Separator.decode(Bytes.toString(components[0]),
Separator.VALUES, Separator.TAB, Separator.SPACE);
- Long ts = TimelineStorageUtils.invertLong(Bytes.toLong(components[1]));
+ Long ts = LongConverter.invertLong(Bytes.toLong(components[1]));
String infoKey = components[2].length == 0 ? null :
Separator.decode(Bytes.toString(components[2]),
Separator.VALUES, Separator.TAB, Separator.SPACE);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java
index 48c56f9..600601a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import java.io.IOException;
+import java.io.Serializable;
import org.apache.hadoop.hbase.util.Bytes;
@@ -26,14 +27,15 @@ import org.apache.hadoop.hbase.util.Bytes;
* Encodes a value by interpreting it as a Long and converting it to bytes and
* decodes a set of bytes as a Long.
*/
-public final class LongConverter implements NumericValueConverter {
- private static final LongConverter INSTANCE = new LongConverter();
+public final class LongConverter implements NumericValueConverter,
+ Serializable {
- private LongConverter() {
- }
+ /**
+ * Added because we implement Comparator<Number>.
+ */
+ private static final long serialVersionUID = 1L;
- public static LongConverter getInstance() {
- return INSTANCE;
+ public LongConverter() {
}
@Override
@@ -76,4 +78,17 @@ public final class LongConverter implements NumericValueConverter {
}
return sum;
}
+
+ /**
+ * Converts a timestamp into it's inverse timestamp to be used in (row) keys
+ * where we want to have the most recent timestamp in the top of the table
+ * (scans start at the most recent timestamp first).
+ *
+ * @param key value to be inverted so that the latest version will be first in
+ * a scan.
+ * @return inverted long
+ */
+ public static long invertLong(long key) {
+ return Long.MAX_VALUE - key;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java
index 3954145..4a724d6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java
@@ -23,13 +23,13 @@ import java.io.IOException;
* Encodes and decodes column names / row keys which are long.
*/
public final class LongKeyConverter implements KeyConverter<Long> {
- private static final LongKeyConverter INSTANCE = new LongKeyConverter();
- public static LongKeyConverter getInstance() {
- return INSTANCE;
- }
+ /**
+ * To delegate the actual work to.
+ */
+ private final LongConverter longConverter = new LongConverter();
- private LongKeyConverter() {
+ public LongKeyConverter() {
}
/*
@@ -44,7 +44,7 @@ public final class LongKeyConverter implements KeyConverter<Long> {
try {
// IOException will not be thrown here as we are explicitly passing
// Long.
- return LongConverter.getInstance().encodeValue(key);
+ return longConverter.encodeValue(key);
} catch (IOException e) {
return null;
}
@@ -60,7 +60,7 @@ public final class LongKeyConverter implements KeyConverter<Long> {
@Override
public Long decode(byte[] bytes) {
try {
- return (Long) LongConverter.getInstance().decodeValue(bytes);
+ return (Long) longConverter.decodeValue(bytes);
} catch (IOException e) {
return null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/RowKeyPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/RowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/RowKeyPrefix.java
new file mode 100644
index 0000000..6159dc7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/RowKeyPrefix.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+/**
+ * In queries where a single result is needed, an exact rowkey can be used
+ * through the corresponding rowkey#getRowKey() method. For queries that need to
+ * scan over a range of rowkeys, a partial (the initial part) of rowkeys are
+ * used. Classes implementing RowKeyPrefix indicate that they are the initial
+ * part of rowkeys, with different constructors with fewer number of argument to
+ * form a partial rowkey, a prefix.
+ *
+ * @param <R> indicating the type of rowkey that a particular implementation is
+ * a prefix for.
+ */
+public interface RowKeyPrefix<R> {
+
+ /**
+ * Create a row key prefix, meaning a partial rowkey that can be used in range
+ * scans. Which fields are included in the prefix will depend on the
+ * constructor of the specific instance that was used. Output depends on which
+ * constructor was used.
+ * @return a prefix of the following form {@code fist!second!...!last!}
+ */
+ byte[] getRowKeyPrefix();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java
index b0f6d55..282848e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java
@@ -24,13 +24,8 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.common;
* added later, if required in the associated ColumnPrefix implementations.
*/
public final class StringKeyConverter implements KeyConverter<String> {
- private static final StringKeyConverter INSTANCE = new StringKeyConverter();
- public static StringKeyConverter getInstance() {
- return INSTANCE;
- }
-
- private StringKeyConverter() {
+ public StringKeyConverter() {
}
/*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
index d52a5d7..aa9a793 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
@@ -18,14 +18,12 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import java.io.IOException;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.SortedSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,13 +35,10 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
@@ -52,7 +47,6 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
@@ -73,19 +67,6 @@ public final class TimelineStorageUtils {
public static final long MILLIS_ONE_DAY = 86400000L;
/**
- * Converts a timestamp into it's inverse timestamp to be used in (row) keys
- * where we want to have the most recent timestamp in the top of the table
- * (scans start at the most recent timestamp first).
- *
- * @param key value to be inverted so that the latest version will be first in
- * a scan.
- * @return inverted long
- */
- public static long invertLong(long key) {
- return Long.MAX_VALUE - key;
- }
-
- /**
* Converts an int into it's inverse int to be used in (row) keys
* where we want to have the largest int value in the top of the table
* (scans start at the largest int first).
@@ -164,66 +145,6 @@ public final class TimelineStorageUtils {
}
/**
- * checks if an application has finished.
- *
- * @param te TimlineEntity object.
- * @return true if application has finished else false
- */
- public static boolean isApplicationFinished(TimelineEntity te) {
- SortedSet<TimelineEvent> allEvents = te.getEvents();
- if ((allEvents != null) && (allEvents.size() > 0)) {
- TimelineEvent event = allEvents.last();
- if (event.getId().equals(
- ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
- return true;
- }
- }
- return false;
- }
-
- /**
- * Check if we have a certain field amongst fields to retrieve. This method
- * checks against {@link Field#ALL} as well because that would mean field
- * passed needs to be matched.
- *
- * @param fieldsToRetrieve fields to be retrieved.
- * @param requiredField fields to be checked in fieldsToRetrieve.
- * @return true if has the required field, false otherwise.
- */
- public static boolean hasField(EnumSet<Field> fieldsToRetrieve,
- Field requiredField) {
- return fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(requiredField);
- }
-
- /**
- * Checks if the input TimelineEntity object is an ApplicationEntity.
- *
- * @param te TimelineEntity object.
- * @return true if input is an ApplicationEntity, false otherwise
- */
- public static boolean isApplicationEntity(TimelineEntity te) {
- return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString());
- }
-
- /**
- * @param te TimelineEntity object.
- * @param eventId event with this id needs to be fetched
- * @return TimelineEvent if TimelineEntity contains the desired event.
- */
- public static TimelineEvent getApplicationEvent(TimelineEntity te,
- String eventId) {
- if (isApplicationEntity(te)) {
- for (TimelineEvent event : te.getEvents()) {
- if (event.getId().equals(eventId)) {
- return event;
- }
- }
- }
- return null;
- }
-
- /**
* Returns the first seen aggregation operation as seen in the list of input
* tags or null otherwise.
*
@@ -646,98 +567,6 @@ public final class TimelineStorageUtils {
return appId;
}
- /**
- * Helper method for reading relationship.
- *
- * @param <T> Describes the type of column prefix.
- * @param entity entity to fill.
- * @param result result from HBase.
- * @param prefix column prefix.
- * @param isRelatedTo if true, means relationship is to be added to
- * isRelatedTo, otherwise its added to relatesTo.
- * @throws IOException if any problem is encountered while reading result.
- */
- public static <T> void readRelationship(
- TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
- boolean isRelatedTo) throws IOException {
- // isRelatedTo and relatesTo are of type Map<String, Set<String>>
- Map<String, Object> columns =
- prefix.readResults(result, StringKeyConverter.getInstance());
- for (Map.Entry<String, Object> column : columns.entrySet()) {
- for (String id : Separator.VALUES.splitEncoded(
- column.getValue().toString())) {
- if (isRelatedTo) {
- entity.addIsRelatedToEntity(column.getKey(), id);
- } else {
- entity.addRelatesToEntity(column.getKey(), id);
- }
- }
- }
- }
-
- /**
- * Helper method for reading key-value pairs for either info or config.
- *
- * @param <T> Describes the type of column prefix.
- * @param entity entity to fill.
- * @param result result from HBase.
- * @param prefix column prefix.
- * @param isConfig if true, means we are reading configs, otherwise info.
- * @throws IOException if any problem is encountered while reading result.
- */
- public static <T> void readKeyValuePairs(
- TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
- boolean isConfig) throws IOException {
- // info and configuration are of type Map<String, Object or String>
- Map<String, Object> columns =
- prefix.readResults(result, StringKeyConverter.getInstance());
- if (isConfig) {
- for (Map.Entry<String, Object> column : columns.entrySet()) {
- entity.addConfig(column.getKey(), column.getValue().toString());
- }
- } else {
- entity.addInfo(columns);
- }
- }
-
- /**
- * Read events from the entity table or the application table. The column name
- * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
- * if there is no info associated with the event.
- *
- * @param <T> Describes the type of column prefix.
- * @param entity entity to fill.
- * @param result HBase Result.
- * @param prefix column prefix.
- * @throws IOException if any problem is encountered while reading result.
- */
- public static <T> void readEvents(TimelineEntity entity, Result result,
- ColumnPrefix<T> prefix) throws IOException {
- Map<String, TimelineEvent> eventsMap = new HashMap<>();
- Map<EventColumnName, Object> eventsResult =
- prefix.readResults(result, EventColumnNameConverter.getInstance());
- for (Map.Entry<EventColumnName, Object>
- eventResult : eventsResult.entrySet()) {
- EventColumnName eventColumnName = eventResult.getKey();
- String key = eventColumnName.getId() +
- Long.toString(eventColumnName.getTimestamp());
- // Retrieve previously seen event to add to it
- TimelineEvent event = eventsMap.get(key);
- if (event == null) {
- // First time we're seeing this event, add it to the eventsMap
- event = new TimelineEvent();
- event.setId(eventColumnName.getId());
- event.setTimestamp(eventColumnName.getTimestamp());
- eventsMap.put(key, event);
- }
- if (eventColumnName.getInfoKey() != null) {
- event.addInfo(eventColumnName.getInfoKey(), eventResult.getValue());
- }
- }
- Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
- entity.addEvents(eventsSet);
- }
-
public static boolean isFlowRunTable(HRegionInfo hRegionInfo,
Configuration conf) {
String regionTableName = hRegionInfo.getTable().getNameAsString();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
index 775879a..93b4b36 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
@@ -49,8 +49,7 @@ public enum EntityColumn implements Column<EntityTable> {
/**
* When the entity was created.
*/
- CREATED_TIME(EntityColumnFamily.INFO, "created_time",
- LongConverter.getInstance()),
+ CREATED_TIME(EntityColumnFamily.INFO, "created_time", new LongConverter()),
/**
* The version of the flow that this entity belongs to.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
index 02a4bb3..e410549 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
@@ -67,8 +67,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
/**
* Metrics are stored with the metric name as the column name.
*/
- METRIC(EntityColumnFamily.METRICS, null,
- LongConverter.getInstance());
+ METRIC(EntityColumnFamily.METRICS, null, new LongConverter());
private final ColumnHelper<EntityTable> column;
private final ColumnFamily<EntityTable> columnFamily;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
index 6d08390..ff22178 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
@@ -17,6 +17,12 @@
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
/**
* Represents a rowkey for the entity table.
*/
@@ -28,6 +34,8 @@ public class EntityRowKey {
private final String appId;
private final String entityType;
private final String entityId;
+ private final KeyConverter<EntityRowKey> entityRowKeyConverter =
+ new EntityRowKeyConverter();
public EntityRowKey(String clusterId, String userId, String flowName,
Long flowRunId, String appId, String entityType, String entityId) {
@@ -69,61 +77,14 @@ public class EntityRowKey {
}
/**
- * Constructs a row key prefix for the entity table as follows:
- * {@code userName!clusterId!flowName!flowRunId!AppId}.
- *
- * @param clusterId Context cluster id.
- * @param userId User name.
- * @param flowName Flow name.
- * @param flowRunId Run Id for the flow.
- * @param appId Application Id.
- * @return byte array with the row key prefix.
- */
- public static byte[] getRowKeyPrefix(String clusterId, String userId,
- String flowName, Long flowRunId, String appId) {
- return EntityRowKeyConverter.getInstance().encode(new EntityRowKey(
- clusterId, userId, flowName, flowRunId, appId, null, null));
- }
-
- /**
- * Constructs a row key prefix for the entity table as follows:
- * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!}.
- * Typically used while querying multiple entities of a particular entity
- * type.
- *
- * @param clusterId Context cluster id.
- * @param userId User name.
- * @param flowName Flow name.
- * @param flowRunId Run Id for the flow.
- * @param appId Application Id.
- * @param entityType Entity type.
- * @return byte array with the row key prefix.
- */
- public static byte[] getRowKeyPrefix(String clusterId, String userId,
- String flowName, Long flowRunId, String appId, String entityType) {
- return EntityRowKeyConverter.getInstance().encode(new EntityRowKey(
- clusterId, userId, flowName, flowRunId, appId, entityType, null));
- }
-
- /**
* Constructs a row key for the entity table as follows:
* {@code userName!clusterId!flowName!flowRunId!AppId!entityType!entityId}.
* Typically used while querying a specific entity.
*
- * @param clusterId Context cluster id.
- * @param userId User name.
- * @param flowName Flow name.
- * @param flowRunId Run Id for the flow.
- * @param appId Application Id.
- * @param entityType Entity type.
- * @param entityId Entity Id.
* @return byte array with the row key.
*/
- public static byte[] getRowKey(String clusterId, String userId,
- String flowName, Long flowRunId, String appId, String entityType,
- String entityId) {
- return EntityRowKeyConverter.getInstance().encode(new EntityRowKey(
- clusterId, userId, flowName, flowRunId, appId, entityType, entityId));
+ public byte[] getRowKey() {
+ return entityRowKeyConverter.encode(this);
}
/**
@@ -133,6 +94,132 @@ public class EntityRowKey {
* @return An <cite>EntityRowKey</cite> object.
*/
public static EntityRowKey parseRowKey(byte[] rowKey) {
- return EntityRowKeyConverter.getInstance().decode(rowKey);
+ return new EntityRowKeyConverter().decode(rowKey);
+ }
+
+ /**
+ * Encodes and decodes row key for entity table. The row key is of the form :
+ * userName!clusterId!flowName!flowRunId!appId!entityType!entityId. flowRunId
+ * is a long, appId is encoded/decoded using {@link AppIdKeyConverter} and
+ * rest are strings.
+ * <p>
+ */
+ final private static class EntityRowKeyConverter implements
+ KeyConverter<EntityRowKey> {
+
+ private final AppIdKeyConverter appIDKeyConverter = new AppIdKeyConverter();
+
+ private EntityRowKeyConverter() {
+ }
+
+ /**
+ * Entity row key is of the form
+ * userName!clusterId!flowName!flowRunId!appId!entityType!entityId w. each
+ * segment separated by !. The sizes below indicate sizes of each one of
+ * these segments in sequence. clusterId, userName, flowName, entityType and
+ * entityId are strings. flowrunId is a long hence 8 bytes in size. app id
+ * is represented as 12 bytes with cluster timestamp part of appid being 8
+ * bytes (long) and seq id being 4 bytes(int). Strings are variable in size
+ * (i.e. end whenever separator is encountered). This is used while decoding
+ * and helps in determining where to split.
+ */
+ private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
+ Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+ AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE,
+ Separator.VARIABLE_SIZE };
+
+ /*
+ * (non-Javadoc)
+ *
+ * Encodes EntityRowKey object into a byte array with each component/field
+ * in EntityRowKey separated by Separator#QUALIFIERS. This leads to an
+ * entity table row key of the form
+ * userName!clusterId!flowName!flowRunId!appId!entityType!entityId If
+ * entityType in passed EntityRowKey object is null (and the fields
+ * preceding it i.e. clusterId, userId and flowName, flowRunId and appId
+ * are not null), this returns a row key prefix of the form
+ * userName!clusterId!flowName!flowRunId!appId! and if entityId in
+ * EntityRowKey is null (other 6 components are not null), this returns a
+ * row key prefix of the form
+ * userName!clusterId!flowName!flowRunId!appId!entityType! flowRunId is
+ * inverted while encoding as it helps maintain a descending order for row
+ * keys in entity table.
+ *
+ * @see org.apache.hadoop.yarn.server.timelineservice.storage.common
+ * .KeyConverter#encode(java.lang.Object)
+ */
+ @Override
+ public byte[] encode(EntityRowKey rowKey) {
+ byte[] user =
+ Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
+ Separator.QUALIFIERS);
+ byte[] cluster =
+ Separator.encode(rowKey.getClusterId(), Separator.SPACE,
+ Separator.TAB, Separator.QUALIFIERS);
+ byte[] flow =
+ Separator.encode(rowKey.getFlowName(), Separator.SPACE,
+ Separator.TAB, Separator.QUALIFIERS);
+ byte[] first = Separator.QUALIFIERS.join(user, cluster, flow);
+ // Note that flowRunId is a long, so we can't encode them all at the same
+ // time.
+ byte[] second =
+ Bytes.toBytes(LongConverter.invertLong(rowKey.getFlowRunId()));
+ byte[] third = appIDKeyConverter.encode(rowKey.getAppId());
+ if (rowKey.getEntityType() == null) {
+ return Separator.QUALIFIERS.join(first, second, third,
+ Separator.EMPTY_BYTES);
+ }
+ byte[] entityType =
+ Separator.encode(rowKey.getEntityType(), Separator.SPACE,
+ Separator.TAB, Separator.QUALIFIERS);
+ byte[] entityId =
+ rowKey.getEntityId() == null ? Separator.EMPTY_BYTES : Separator
+ .encode(rowKey.getEntityId(), Separator.SPACE, Separator.TAB,
+ Separator.QUALIFIERS);
+ byte[] fourth = Separator.QUALIFIERS.join(entityType, entityId);
+ return Separator.QUALIFIERS.join(first, second, third, fourth);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * Decodes an application row key of the form
+ * userName!clusterId!flowName!flowRunId!appId!entityType!entityId
+ * represented in byte format and converts it into an EntityRowKey object.
+ * flowRunId is inverted while decoding as it was inverted while encoding.
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common
+ * .KeyConverter#decode(byte[])
+ */
+ @Override
+ public EntityRowKey decode(byte[] rowKey) {
+ byte[][] rowKeyComponents =
+ Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
+ if (rowKeyComponents.length != 7) {
+ throw new IllegalArgumentException("the row key is not valid for "
+ + "an entity");
+ }
+ String userId =
+ Separator.decode(Bytes.toString(rowKeyComponents[0]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ String clusterId =
+ Separator.decode(Bytes.toString(rowKeyComponents[1]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ String flowName =
+ Separator.decode(Bytes.toString(rowKeyComponents[2]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ Long flowRunId =
+ LongConverter.invertLong(Bytes.toLong(rowKeyComponents[3]));
+ String appId = appIDKeyConverter.decode(rowKeyComponents[4]);
+ String entityType =
+ Separator.decode(Bytes.toString(rowKeyComponents[5]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ String entityId =
+ Separator.decode(Bytes.toString(rowKeyComponents[6]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
+ entityType, entityId);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java
deleted file mode 100644
index 43c0569..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-
-/**
- * Encodes and decodes row key for entity table.
- * The row key is of the form :
- * userName!clusterId!flowName!flowRunId!appId!entityType!entityId.
- * flowRunId is a long, appId is encoded/decoded using
- * {@link AppIdKeyConverter} and rest are strings.
- */
-public final class EntityRowKeyConverter implements KeyConverter<EntityRowKey> {
- private static final EntityRowKeyConverter INSTANCE =
- new EntityRowKeyConverter();
-
- public static EntityRowKeyConverter getInstance() {
- return INSTANCE;
- }
-
- private EntityRowKeyConverter() {
- }
-
- // Entity row key is of the form
- // userName!clusterId!flowName!flowRunId!appId!entityType!entityId with each
- // segment separated by !. The sizes below indicate sizes of each one of these
- // segements in sequence. clusterId, userName, flowName, entityType and
- // entityId are strings. flowrunId is a long hence 8 bytes in size. app id is
- // represented as 12 bytes with cluster timestamp part of appid being 8 bytes
- // (long) and seq id being 4 bytes(int).
- // Strings are variable in size (i.e. end whenever separator is encountered).
- // This is used while decoding and helps in determining where to split.
- private static final int[] SEGMENT_SIZES = {
- Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
- Bytes.SIZEOF_LONG, AppIdKeyConverter.getKeySize(),
- Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE };
-
- /*
- * (non-Javadoc)
- *
- * Encodes EntityRowKey object into a byte array with each component/field in
- * EntityRowKey separated by Separator#QUALIFIERS. This leads to an entity
- * table row key of the form
- * userName!clusterId!flowName!flowRunId!appId!entityType!entityId
- * If entityType in passed EntityRowKey object is null (and the fields
- * preceding it i.e. clusterId, userId and flowName, flowRunId and appId are
- * not null), this returns a row key prefix of the form
- * userName!clusterId!flowName!flowRunId!appId! and if entityId in
- * EntityRowKey is null (other 6 components are not null), this returns a row
- * key prefix of the form
- * userName!clusterId!flowName!flowRunId!appId!entityType!
- * flowRunId is inverted while encoding as it helps maintain a descending
- * order for row keys in entity table.
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
- * #encode(java.lang.Object)
- */
- @Override
- public byte[] encode(EntityRowKey rowKey) {
- byte[] user = Separator.encode(rowKey.getUserId(),
- Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
- byte[] cluster = Separator.encode(rowKey.getClusterId(),
- Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
- byte[] flow = Separator.encode(rowKey.getFlowName(),
- Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
- byte[] first = Separator.QUALIFIERS.join(user, cluster, flow);
- // Note that flowRunId is a long, so we can't encode them all at the same
- // time.
- byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(
- rowKey.getFlowRunId()));
- byte[] third = AppIdKeyConverter.getInstance().encode(rowKey.getAppId());
- if (rowKey.getEntityType() == null) {
- return Separator.QUALIFIERS.join(
- first, second, third, Separator.EMPTY_BYTES);
- }
- byte[] entityType = Separator.encode(rowKey.getEntityType(),
- Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
- byte[] entityId = rowKey.getEntityId() == null ? Separator.EMPTY_BYTES :
- Separator.encode(rowKey.getEntityId(), Separator.SPACE, Separator.TAB,
- Separator.QUALIFIERS);
- byte[] fourth = Separator.QUALIFIERS.join(entityType, entityId);
- return Separator.QUALIFIERS.join(first, second, third, fourth);
- }
-
- /*
- * (non-Javadoc)
- *
- * Decodes an application row key of the form
- * userName!clusterId!flowName!flowRunId!appId!entityType!entityId represented
- * in byte format and converts it into an EntityRowKey object. flowRunId is
- * inverted while decoding as it was inverted while encoding.
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
- * #decode(byte[])
- */
- @Override
- public EntityRowKey decode(byte[] rowKey) {
- byte[][] rowKeyComponents =
- Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
- if (rowKeyComponents.length != 7) {
- throw new IllegalArgumentException("the row key is not valid for " +
- "an entity");
- }
- String userId = Separator.decode(Bytes.toString(rowKeyComponents[0]),
- Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
- String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[1]),
- Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
- String flowName = Separator.decode(Bytes.toString(rowKeyComponents[2]),
- Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
- Long flowRunId =
- TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
- String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[4]);
- String entityType = Separator.decode(Bytes.toString(rowKeyComponents[5]),
- Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
- String entityId =Separator.decode(Bytes.toString(rowKeyComponents[6]),
- Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
- return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
- entityType, entityId);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java
new file mode 100644
index 0000000..9146180
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
+
+/**
+ * Represents a partial rowkey without the entityId or without entityType and
+ * entityId for the entity table.
+ *
+ */
+public class EntityRowKeyPrefix extends EntityRowKey implements
+ RowKeyPrefix<EntityRowKey> {
+
+ /**
+ * Creates a prefix which generates the following rowKeyPrefixes for the
+ * entity table:
+ * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!}.
+ *
+ * @param clusterId identifying the cluster
+ * @param userId identifying the user
+ * @param flowName identifying the flow
+ * @param flowRunId identifying the individual run of this flow
+ * @param appId identifying the application
+ * @param entityType which entity type
+ */
+ public EntityRowKeyPrefix(String clusterId, String userId, String flowName,
+ Long flowRunId, String appId, String entityType) {
+ super(clusterId, userId, flowName, flowRunId, appId, entityType, null);
+ }
+
+ /**
+ * Creates a prefix which generates the following rowKeyPrefixes for the
+ * entity table:
+ * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!entityId}.
+ *
+ * @param clusterId identifying the cluster
+ * @param userId identifying the user
+ * @param flowName identifying the flow
+ * @param flowRunId identifying the individual run of this flow
+ * @param appId identifying the application
+ */
+ public EntityRowKeyPrefix(String clusterId, String userId, String flowName,
+ Long flowRunId, String appId) {
+ super(clusterId, userId, flowName, flowRunId, appId, null, null);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.application.
+ * RowKeyPrefix#getRowKeyPrefix()
+ */
+ public byte[] getRowKeyPrefix() {
+ return super.getRowKey();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
index eea38a5..d10608a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
/**
@@ -28,11 +32,37 @@ public class FlowActivityRowKey {
private final Long dayTs;
private final String userId;
private final String flowName;
+ private final KeyConverter<FlowActivityRowKey> flowActivityRowKeyConverter =
+ new FlowActivityRowKeyConverter();
+ /**
+ * @param clusterId identifying the cluster
+ * @param dayTs to be converted to the top of the day timestamp
+ * @param userId identifying user
+ * @param flowName identifying the flow
+ */
public FlowActivityRowKey(String clusterId, Long dayTs, String userId,
String flowName) {
+ this(clusterId, dayTs, userId, flowName, true);
+ }
+
+ /**
+ * @param clusterId identifying the cluster
+ * @param timestamp when the flow activity happened. May be converted to the
+ * top of the day depending on the convertDayTsToTopOfDay argument.
+ * @param userId identifying user
+ * @param flowName identifying the flow
+ * @param convertDayTsToTopOfDay if true and timestamp isn't null, then
+ * timestamp will be converted to the top-of-the day timestamp
+ */
+ protected FlowActivityRowKey(String clusterId, Long timestamp, String userId,
+ String flowName, boolean convertDayTsToTopOfDay) {
this.clusterId = clusterId;
- this.dayTs = dayTs;
+ if (convertDayTsToTopOfDay && (timestamp != null)) {
+ this.dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(timestamp);
+ } else {
+ this.dayTs = timestamp;
+ }
this.userId = userId;
this.flowName = flowName;
}
@@ -54,46 +84,13 @@ public class FlowActivityRowKey {
}
/**
- * Constructs a row key prefix for the flow activity table as follows:
- * {@code clusterId!}.
- *
- * @param clusterId Cluster Id.
- * @return byte array with the row key prefix
- */
- public static byte[] getRowKeyPrefix(String clusterId) {
- return FlowActivityRowKeyConverter.getInstance().encode(
- new FlowActivityRowKey(clusterId, null, null, null));
- }
-
- /**
- * Constructs a row key prefix for the flow activity table as follows:
- * {@code clusterId!dayTimestamp!}.
- *
- * @param clusterId Cluster Id.
- * @param dayTs Start of the day timestamp.
- * @return byte array with the row key prefix
- */
- public static byte[] getRowKeyPrefix(String clusterId, long dayTs) {
- return FlowActivityRowKeyConverter.getInstance().encode(
- new FlowActivityRowKey(clusterId, dayTs, null, null));
- }
-
- /**
* Constructs a row key for the flow activity table as follows:
* {@code clusterId!dayTimestamp!user!flowName}.
*
- * @param clusterId Cluster Id.
- * @param eventTs event's TimeStamp.
- * @param userId User Id.
- * @param flowName Flow Name.
* @return byte array for the row key
*/
- public static byte[] getRowKey(String clusterId, long eventTs, String userId,
- String flowName) {
- // convert it to Day's time stamp
- eventTs = TimelineStorageUtils.getTopOfTheDayTimestamp(eventTs);
- return FlowActivityRowKeyConverter.getInstance().encode(
- new FlowActivityRowKey(clusterId, eventTs, userId, flowName));
+ public byte[] getRowKey() {
+ return flowActivityRowKeyConverter.encode(this);
}
/**
@@ -103,6 +100,97 @@ public class FlowActivityRowKey {
* @return A <cite>FlowActivityRowKey</cite> object.
*/
public static FlowActivityRowKey parseRowKey(byte[] rowKey) {
- return FlowActivityRowKeyConverter.getInstance().decode(rowKey);
+ return new FlowActivityRowKeyConverter().decode(rowKey);
+ }
+
+ /**
+ * Encodes and decodes row key for flow activity table. The row key is of the
+ * form : clusterId!dayTimestamp!user!flowName. dayTimestamp(top of the day
+ * timestamp) is a long and rest are strings.
+ * <p>
+ */
+ final private static class FlowActivityRowKeyConverter implements
+ KeyConverter<FlowActivityRowKey> {
+
+ private FlowActivityRowKeyConverter() {
+ }
+
+ /**
+ * The flow activity row key is of the form
+ * clusterId!dayTimestamp!user!flowName with each segment separated by !.
+ * The sizes below indicate sizes of each one of these segements in
+ * sequence. clusterId, user and flowName are strings. Top of the day
+ * timestamp is a long hence 8 bytes in size. Strings are variable in size
+ * (i.e. they end whenever separator is encountered). This is used while
+ * decoding and helps in determining where to split.
+ */
+ private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
+ Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE };
+
+ /*
+ * (non-Javadoc)
+ *
+ * Encodes FlowActivityRowKey object into a byte array with each
+ * component/field in FlowActivityRowKey separated by Separator#QUALIFIERS.
+ * This leads to an flow activity table row key of the form
+ * clusterId!dayTimestamp!user!flowName. If dayTimestamp in passed
+ * FlowActivityRowKey object is null and clusterId is not null, then this
+ * returns a row key prefix as clusterId! and if userId in
+ * FlowActivityRowKey is null (and the fields preceding it i.e. clusterId
+ * and dayTimestamp are not null), this returns a row key prefix as
+ * clusterId!dayTimeStamp! dayTimestamp is inverted while encoding as it
+ * helps maintain a descending order for row keys in flow activity table.
+ *
+ * @see org.apache.hadoop.yarn.server.timelineservice.storage.common
+ * .KeyConverter#encode(java.lang.Object)
+ */
+ @Override
+ public byte[] encode(FlowActivityRowKey rowKey) {
+ if (rowKey.getDayTimestamp() == null) {
+ return Separator.QUALIFIERS.join(Separator.encode(
+ rowKey.getClusterId(), Separator.SPACE, Separator.TAB,
+ Separator.QUALIFIERS), Separator.EMPTY_BYTES);
+ }
+ if (rowKey.getUserId() == null) {
+ return Separator.QUALIFIERS.join(Separator.encode(
+ rowKey.getClusterId(), Separator.SPACE, Separator.TAB,
+ Separator.QUALIFIERS), Bytes.toBytes(LongConverter
+ .invertLong(rowKey.getDayTimestamp())), Separator.EMPTY_BYTES);
+ }
+ return Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(),
+ Separator.SPACE, Separator.TAB, Separator.QUALIFIERS), Bytes
+ .toBytes(LongConverter.invertLong(rowKey.getDayTimestamp())),
+ Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
+ Separator.QUALIFIERS), Separator.encode(rowKey.getFlowName(),
+ Separator.SPACE, Separator.TAB, Separator.QUALIFIERS));
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common
+ * .KeyConverter#decode(byte[])
+ */
+ @Override
+ public FlowActivityRowKey decode(byte[] rowKey) {
+ byte[][] rowKeyComponents =
+ Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
+ if (rowKeyComponents.length != 4) {
+ throw new IllegalArgumentException("the row key is not valid for "
+ + "a flow activity");
+ }
+ String clusterId =
+ Separator.decode(Bytes.toString(rowKeyComponents[0]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ Long dayTs = LongConverter.invertLong(Bytes.toLong(rowKeyComponents[1]));
+ String userId =
+ Separator.decode(Bytes.toString(rowKeyComponents[2]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ String flowName =
+ Separator.decode(Bytes.toString(rowKeyComponents[3]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ return new FlowActivityRowKey(clusterId, dayTs, userId, flowName);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java
deleted file mode 100644
index 9dc4c98..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-
-/**
- * Encodes and decodes row key for flow activity table.
- * The row key is of the form : clusterId!dayTimestamp!user!flowName.
- * dayTimestamp(top of the day timestamp) is a long and rest are strings.
- */
-public final class FlowActivityRowKeyConverter implements
- KeyConverter<FlowActivityRowKey> {
- private static final FlowActivityRowKeyConverter INSTANCE =
- new FlowActivityRowKeyConverter();
-
- public static FlowActivityRowKeyConverter getInstance() {
- return INSTANCE;
- }
-
- private FlowActivityRowKeyConverter() {
- }
-
- // Flow activity row key is of the form clusterId!dayTimestamp!user!flowName
- // with each segment separated by !. The sizes below indicate sizes of each
- // one of these segements in sequence. clusterId, user and flowName are
- // strings. Top of the day timestamp is a long hence 8 bytes in size.
- // Strings are variable in size (i.e. end whenever separator is encountered).
- // This is used while decoding and helps in determining where to split.
- private static final int[] SEGMENT_SIZES = {
- Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE,
- Separator.VARIABLE_SIZE };
-
- /*
- * (non-Javadoc)
- *
- * Encodes FlowActivityRowKey object into a byte array with each
- * component/field in FlowActivityRowKey separated by Separator#QUALIFIERS.
- * This leads to an flow activity table row key of the form
- * clusterId!dayTimestamp!user!flowName
- * If dayTimestamp in passed FlowActivityRowKey object is null and clusterId
- * is not null, this returns a row key prefix as clusterId! and if userId in
- * FlowActivityRowKey is null (and the fields preceding it i.e. clusterId and
- * dayTimestamp are not null), this returns a row key prefix as
- * clusterId!dayTimeStamp!
- * dayTimestamp is inverted while encoding as it helps maintain a descending
- * order for row keys in flow activity table.
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
- * #encode(java.lang.Object)
- */
-
- @Override
- public byte[] encode(FlowActivityRowKey rowKey) {
- if (rowKey.getDayTimestamp() == null) {
- return Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(),
- Separator.SPACE, Separator.TAB, Separator.QUALIFIERS),
- Separator.EMPTY_BYTES);
- }
- if (rowKey.getUserId() == null) {
- return Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(),
- Separator.SPACE, Separator.TAB, Separator.QUALIFIERS),
- Bytes.toBytes(TimelineStorageUtils.invertLong(
- rowKey.getDayTimestamp())), Separator.EMPTY_BYTES);
- }
- return Separator.QUALIFIERS.join(
- Separator.encode(rowKey.getClusterId(), Separator.SPACE, Separator.TAB,
- Separator.QUALIFIERS),
- Bytes.toBytes(
- TimelineStorageUtils.invertLong(rowKey.getDayTimestamp())),
- Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
- Separator.QUALIFIERS),
- Separator.encode(rowKey.getFlowName(), Separator.SPACE, Separator.TAB,
- Separator.QUALIFIERS));
- }
-
- @Override
- public FlowActivityRowKey decode(byte[] rowKey) {
- byte[][] rowKeyComponents =
- Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
- if (rowKeyComponents.length != 4) {
- throw new IllegalArgumentException("the row key is not valid for "
- + "a flow activity");
- }
- String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]),
- Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
- Long dayTs =
- TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[1]));
- String userId = Separator.decode(Bytes.toString(rowKeyComponents[2]),
- Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
- String flowName = Separator.decode(Bytes.toString(rowKeyComponents[3]),
- Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
- return new FlowActivityRowKey(clusterId, dayTs, userId, flowName);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyPrefix.java
new file mode 100644
index 0000000..eb88e54
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyPrefix.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
+
+/**
+ * A prefix partial rowkey for flow activities.
+ */
+public class FlowActivityRowKeyPrefix extends FlowActivityRowKey implements
+ RowKeyPrefix<FlowActivityRowKey> {
+
+ /**
+ * Constructs a row key prefix for the flow activity table as follows:
+ * {@code clusterId!dayTimestamp!}.
+ *
+ * @param clusterId Cluster Id.
+ * @param dayTs Start of the day timestamp.
+ */
+ public FlowActivityRowKeyPrefix(String clusterId, Long dayTs) {
+ super(clusterId, dayTs, null, null, false);
+ }
+
+ /**
+ * Constructs a row key prefix for the flow activity table as follows:
+ * {@code clusterId!}.
+ *
+ * @param clusterId identifying the cluster
+ */
+ public FlowActivityRowKeyPrefix(String clusterId) {
+ super(clusterId, null, null, null, false);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.application.
+ * RowKeyPrefix#getRowKeyPrefix()
+ */
+ public byte[] getRowKeyPrefix() {
+ return super.getRowKey();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
index f1553b8..2e7a9d8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
@@ -25,10 +25,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
/**
@@ -41,14 +41,14 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
* application start times.
*/
MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time",
- AggregationOperation.GLOBAL_MIN, LongConverter.getInstance()),
+ AggregationOperation.GLOBAL_MIN, new LongConverter()),
/**
* When the flow ended. This is the maximum of currently known application end
* times.
*/
MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time",
- AggregationOperation.GLOBAL_MAX, LongConverter.getInstance()),
+ AggregationOperation.GLOBAL_MAX, new LongConverter()),
/**
* The version of the flow that this flow belongs to.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
index 0f14c89..e74282a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
@@ -41,7 +41,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
/**
* To store flow run info values.
*/
- METRIC(FlowRunColumnFamily.INFO, "m", null, LongConverter.getInstance());
+ METRIC(FlowRunColumnFamily.INFO, "m", null, new LongConverter());
private final ColumnHelper<FlowRunTable> column;
private final ColumnFamily<FlowRunTable> columnFamily;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
index 925242b..8fda9a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
@@ -17,6 +17,11 @@
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
/**
* Represents a rowkey for the flow run table.
*/
@@ -25,6 +30,8 @@ public class FlowRunRowKey {
private final String userId;
private final String flowName;
private final Long flowRunId;
+ private final FlowRunRowKeyConverter flowRunRowKeyConverter =
+ new FlowRunRowKeyConverter();
public FlowRunRowKey(String clusterId, String userId, String flowName,
Long flowRunId) {
@@ -51,36 +58,16 @@ public class FlowRunRowKey {
}
/**
- * Constructs a row key prefix for the flow run table as follows: {
- * clusterId!userI!flowName!}.
- *
- * @param clusterId Cluster Id.
- * @param userId User Id.
- * @param flowName Flow Name.
- * @return byte array with the row key prefix
- */
- public static byte[] getRowKeyPrefix(String clusterId, String userId,
- String flowName) {
- return FlowRunRowKeyConverter.getInstance().encode(new FlowRunRowKey(
- clusterId, userId, flowName, null));
- }
-
- /**
* Constructs a row key for the entity table as follows: {
* clusterId!userId!flowName!Inverted Flow Run Id}.
*
- * @param clusterId Cluster Id.
- * @param userId User Id.
- * @param flowName Flow Name.
- * @param flowRunId Run Id for the flow name.
* @return byte array with the row key
*/
- public static byte[] getRowKey(String clusterId, String userId,
- String flowName, Long flowRunId) {
- return FlowRunRowKeyConverter.getInstance().encode(new FlowRunRowKey(
- clusterId, userId, flowName, flowRunId));
+ public byte[] getRowKey() {
+ return flowRunRowKeyConverter.encode(this);
}
+
/**
* Given the raw row key as bytes, returns the row key as an object.
*
@@ -88,7 +75,7 @@ public class FlowRunRowKey {
* @return A <cite>FlowRunRowKey</cite> object.
*/
public static FlowRunRowKey parseRowKey(byte[] rowKey) {
- return FlowRunRowKeyConverter.getInstance().decode(rowKey);
+ return new FlowRunRowKeyConverter().decode(rowKey);
}
/**
@@ -106,4 +93,98 @@ public class FlowRunRowKey {
flowKeyStr.append("}");
return flowKeyStr.toString();
}
+
+ /**
+ * Encodes and decodes row key for flow run table.
+ * The row key is of the form : clusterId!userId!flowName!flowrunId.
+ * flowrunId is a long and rest are strings.
+ * <p>
+ */
+ final private static class FlowRunRowKeyConverter implements
+ KeyConverter<FlowRunRowKey> {
+
+ private FlowRunRowKeyConverter() {
+ }
+
+ /**
+ * The flow run row key is of the form clusterId!userId!flowName!flowrunId
+ * with each segment separated by !. The sizes below indicate sizes of each
+ * one of these segments in sequence. clusterId, userId and flowName are
+ * strings. flowrunId is a long hence 8 bytes in size. Strings are variable
+ * in size (i.e. end whenever separator is encountered). This is used while
+ * decoding and helps in determining where to split.
+ */
+ private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
+ Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG };
+
+ /*
+ * (non-Javadoc)
+ *
+ * Encodes FlowRunRowKey object into a byte array with each component/field
+ * in FlowRunRowKey separated by Separator#QUALIFIERS. This leads to an flow
+ * run row key of the form clusterId!userId!flowName!flowrunId If flowRunId
+ * in passed FlowRunRowKey object is null (and the fields preceding it i.e.
+ * clusterId, userId and flowName are not null), this returns a row key
+ * prefix of the form clusterId!userName!flowName! flowRunId is inverted
+ * while encoding as it helps maintain a descending order for flow keys in
+ * flow run table.
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common
+ * .KeyConverter#encode(java.lang.Object)
+ */
+ @Override
+ public byte[] encode(FlowRunRowKey rowKey) {
+ byte[] first =
+ Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(),
+ Separator.SPACE, Separator.TAB, Separator.QUALIFIERS), Separator
+ .encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
+ Separator.QUALIFIERS), Separator.encode(rowKey.getFlowName(),
+ Separator.SPACE, Separator.TAB, Separator.QUALIFIERS));
+ if (rowKey.getFlowRunId() == null) {
+ return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES);
+ } else {
+ // Note that flowRunId is a long, so we can't encode them all at the
+ // same
+ // time.
+ byte[] second =
+ Bytes.toBytes(LongConverter.invertLong(rowKey.getFlowRunId()));
+ return Separator.QUALIFIERS.join(first, second);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * Decodes an flow run row key of the form
+ * clusterId!userId!flowName!flowrunId represented in byte format and
+ * converts it into an FlowRunRowKey object. flowRunId is inverted while
+ * decoding as it was inverted while encoding.
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common
+ * .KeyConverter#decode(byte[])
+ */
+ @Override
+ public FlowRunRowKey decode(byte[] rowKey) {
+ byte[][] rowKeyComponents =
+ Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
+ if (rowKeyComponents.length != 4) {
+ throw new IllegalArgumentException("the row key is not valid for "
+ + "a flow run");
+ }
+ String clusterId =
+ Separator.decode(Bytes.toString(rowKeyComponents[0]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ String userId =
+ Separator.decode(Bytes.toString(rowKeyComponents[1]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ String flowName =
+ Separator.decode(Bytes.toString(rowKeyComponents[2]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ Long flowRunId =
+ LongConverter.invertLong(Bytes.toLong(rowKeyComponents[3]));
+ return new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java
deleted file mode 100644
index 642f065..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-
-/**
- * Encodes and decodes row key for flow run table.
- * The row key is of the form : clusterId!userId!flowName!flowrunId.
- * flowrunId is a long and rest are strings.
- */
-public final class FlowRunRowKeyConverter implements
- KeyConverter<FlowRunRowKey> {
- private static final FlowRunRowKeyConverter INSTANCE =
- new FlowRunRowKeyConverter();
-
- public static FlowRunRowKeyConverter getInstance() {
- return INSTANCE;
- }
-
- private FlowRunRowKeyConverter() {
- }
-
- // Flow run row key is of the form
- // clusterId!userId!flowName!flowrunId with each segment separated by !.
- // The sizes below indicate sizes of each one of these segments in sequence.
- // clusterId, userId and flowName are strings. flowrunId is a long hence 8
- // bytes in size. Strings are variable in size (i.e. end whenever separator is
- // encountered). This is used while decoding and helps in determining where to
- // split.
- private static final int[] SEGMENT_SIZES = {
- Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
- Bytes.SIZEOF_LONG };
-
- /*
- * (non-Javadoc)
- *
- * Encodes FlowRunRowKey object into a byte array with each component/field in
- * FlowRunRowKey separated by Separator#QUALIFIERS. This leads to an
- * flow run row key of the form clusterId!userId!flowName!flowrunId
- * If flowRunId in passed FlowRunRowKey object is null (and the fields
- * preceding it i.e. clusterId, userId and flowName are not null), this
- * returns a row key prefix of the form clusterId!userName!flowName!
- * flowRunId is inverted while encoding as it helps maintain a descending
- * order for flow keys in flow run table.
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
- * #encode(java.lang.Object)
- */
- @Override
- public byte[] encode(FlowRunRowKey rowKey) {
- byte[] first = Separator.QUALIFIERS.join(
- Separator.encode(rowKey.getClusterId(), Separator.SPACE, Separator.TAB,
- Separator.QUALIFIERS),
- Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
- Separator.QUALIFIERS),
- Separator.encode(rowKey.getFlowName(), Separator.SPACE, Separator.TAB,
- Separator.QUALIFIERS));
- if (rowKey.getFlowRunId() == null) {
- return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES);
- } else {
- // Note that flowRunId is a long, so we can't encode them all at the same
- // time.
- byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(
- rowKey.getFlowRunId()));
- return Separator.QUALIFIERS.join(first, second);
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * Decodes an flow run row key of the form
- * clusterId!userId!flowName!flowrunId represented in byte format and converts
- * it into an FlowRunRowKey object. flowRunId is inverted while decoding as
- * it was inverted while encoding.
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
- * #decode(byte[])
- */
- @Override
- public FlowRunRowKey decode(byte[] rowKey) {
- byte[][] rowKeyComponents =
- Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
- if (rowKeyComponents.length != 4) {
- throw new IllegalArgumentException("the row key is not valid for " +
- "a flow run");
- }
- String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]),
- Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
- String userId = Separator.decode(Bytes.toString(rowKeyComponents[1]),
- Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
- String flowName = Separator.decode(Bytes.toString(rowKeyComponents[2]),
- Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
- Long flowRunId =
- TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
- return new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java
new file mode 100644
index 0000000..23ebc66
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
+
+/**
+ * Represents a partial rowkey (without the flowRunId) for the flow run table.
+ */
+public class FlowRunRowKeyPrefix extends FlowRunRowKey implements
+ RowKeyPrefix<FlowRunRowKey> {
+
+ /**
+ * Constructs a row key prefix for the flow run table as follows:
+ * {@code clusterId!userI!flowName!}.
+ *
+ * @param clusterId identifying the cluster
+ * @param userId identifying the user
+ * @param flowName identifying the flow
+ */
+ public FlowRunRowKeyPrefix(String clusterId, String userId,
+ String flowName) {
+ super(clusterId, userId, flowName, null);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.application.
+ * RowKeyPrefix#getRowKeyPrefix()
+ */
+ public byte[] getRowKeyPrefix() {
+ // We know we're a FlowRunRowKey with null florRunId, so we can simply
+ // delegate
+ return super.getRowKey();
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org