You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2016/06/21 23:49:22 UTC
[36/50] [abbrv] hadoop git commit: YARN-5109. timestamps are stored
unencoded causing parse errors (Varun Saxena via sjlee)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ad33a07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java
new file mode 100644
index 0000000..32ef1c3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Encodes and decodes event column names for application and entity tables.
+ * The event column name is of the form : eventId=timestamp=infokey.
+ * If info is not associated with the event, event column name is of the form :
+ * eventId=timestamp=
+ * Event timestamp is long and rest are strings.
+ * Column prefixes are not part of the eventcolumn name passed for encoding. It
+ * is added later, if required in the associated ColumnPrefix implementations.
+ */
+public final class EventColumnNameConverter
+ implements KeyConverter<EventColumnName> {
+ private static final EventColumnNameConverter INSTANCE =
+ new EventColumnNameConverter();
+
+ public static EventColumnNameConverter getInstance() {
+ return INSTANCE;
+ }
+
+ private EventColumnNameConverter() {
+ }
+
+ // eventId=timestamp=infokey are of types String, Long String
+ // Strings are variable in size (i.e. end whenever separator is encountered).
+ // This is used while decoding and helps in determining where to split.
+ private static final int[] SEGMENT_SIZES = {
+ Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE };
+
+ /*
+ * (non-Javadoc)
+ *
+ * Encodes EventColumnName into a byte array with each component/field in
+ * EventColumnName separated by Separator#VALUES. This leads to an event
+ * column name of the form eventId=timestamp=infokey.
+ * If timestamp in passed EventColumnName object is null (eventId is not null)
+ * this returns a column prefix of the form eventId= and if infokey in
+ * EventColumnName is null (other 2 components are not null), this returns a
+ * column name of the form eventId=timestamp=
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+ * #encode(java.lang.Object)
+ */
+ @Override
+ public byte[] encode(EventColumnName key) {
+ byte[] first = Separator.encode(key.getId(), Separator.SPACE, Separator.TAB,
+ Separator.VALUES);
+ if (key.getTimestamp() == null) {
+ return Separator.VALUES.join(first, Separator.EMPTY_BYTES);
+ }
+ byte[] second = Bytes.toBytes(
+ TimelineStorageUtils.invertLong(key.getTimestamp()));
+ if (key.getInfoKey() == null) {
+ return Separator.VALUES.join(first, second, Separator.EMPTY_BYTES);
+ }
+ return Separator.VALUES.join(first, second, Separator.encode(
+ key.getInfoKey(), Separator.SPACE, Separator.TAB, Separator.VALUES));
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * Decodes an event column name of the form eventId=timestamp= or
+ * eventId=timestamp=infoKey represented in byte format and converts it into
+ * an EventColumnName object.
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+ * #decode(byte[])
+ */
+ @Override
+ public EventColumnName decode(byte[] bytes) {
+ byte[][] components = Separator.VALUES.split(bytes, SEGMENT_SIZES);
+ if (components.length != 3) {
+ throw new IllegalArgumentException("the column name is not valid");
+ }
+ String id = Separator.decode(Bytes.toString(components[0]),
+ Separator.VALUES, Separator.TAB, Separator.SPACE);
+ Long ts = TimelineStorageUtils.invertLong(Bytes.toLong(components[1]));
+ String infoKey = components[2].length == 0 ? null :
+ Separator.decode(Bytes.toString(components[2]),
+ Separator.VALUES, Separator.TAB, Separator.SPACE);
+ return new EventColumnName(id, ts, infoKey);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ad33a07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverter.java
new file mode 100644
index 0000000..4229e81
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverter.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+/**
+ * Interface which has to be implemented for encoding and decoding row keys and
+ * columns.
+ */
+public interface KeyConverter<T> {
+ /**
+ * Encodes a key as a byte array.
+ *
+ * @param key key to be encoded.
+ * @return a byte array.
+ */
+ byte[] encode(T key);
+
+ /**
+ * Decodes a byte array and returns a key of type T.
+ *
+ * @param bytes byte representation
+ * @return an object(key) of type T which has been constructed after decoding
+ * the bytes.
+ */
+ T decode(byte[] bytes);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ad33a07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java
new file mode 100644
index 0000000..3954145
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+/**
+ * Encodes and decodes column names / row keys which are long.
+ */
+public final class LongKeyConverter implements KeyConverter<Long> {
+ private static final LongKeyConverter INSTANCE = new LongKeyConverter();
+
+ public static LongKeyConverter getInstance() {
+ return INSTANCE;
+ }
+
+ private LongKeyConverter() {
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+ * #encode(java.lang.Object)
+ */
+ @Override
+ public byte[] encode(Long key) {
+ try {
+ // IOException will not be thrown here as we are explicitly passing
+ // Long.
+ return LongConverter.getInstance().encodeValue(key);
+ } catch (IOException e) {
+ return null;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+ * #decode(byte[])
+ */
+ @Override
+ public Long decode(byte[] bytes) {
+ try {
+ return (Long) LongConverter.getInstance().decodeValue(bytes);
+ } catch (IOException e) {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ad33a07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
index a81c717..8a178db 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
@@ -45,7 +45,13 @@ public enum Separator {
* getting a + for a space, which may already occur in strings, so we don't
* want that.
*/
- SPACE(" ", "%2$");
+ SPACE(" ", "%2$"),
+
+ /**
+ * separator in values, often used to avoid having these in qualifiers and
+ * names.
+ */
+ TAB("\t", "%3$");
/**
* The string value of this separator.
@@ -67,7 +73,22 @@ public enum Separator {
*/
private final String quotedValue;
- private static final byte[] EMPTY_BYTES = new byte[0];
+ /**
+ * Indicator for variable size of an individual segment in a split. The
+ * segment ends wherever separator is encountered.
+ * Typically used for string.
+ * Also used to indicate that there is no fixed number of splits which need to
+ * be returned. If split limit is specified as this, all possible splits are
+ * returned.
+ */
+ public static final int VARIABLE_SIZE = 0;
+
+
+ /** empty string. */
+ public static final String EMPTY_STRING = "";
+
+ /** empty bytes. */
+ public static final byte[] EMPTY_BYTES = new byte[0];
/**
* @param value of the separator to use. Cannot be null or empty string.
@@ -222,7 +243,6 @@ public enum Separator {
System.arraycopy(this.bytes, 0, buf, offset, this.value.length());
offset += this.value.length();
}
-
}
return buf;
}
@@ -307,7 +327,25 @@ public enum Separator {
* @return source split by this separator.
*/
public byte[][] split(byte[] source, int limit) {
- return TimelineStorageUtils.split(source, this.bytes, limit);
+ return split(source, this.bytes, limit);
+ }
+
+ /**
+ * Splits the source array into multiple array segments using this separator.
+ * The sizes indicate the sizes of the relative components/segments.
+ * In case one of the segments contains this separator before the specified
+ * size is reached, the separator will be considered part of that segment and
+ * we will continue till size is reached.
+ * Variable length strings cannot contain this separator and are indiced with
+ * a size of {@value #VARIABLE_SIZE}. Such strings are encoded for this
+ * separator and decoded after the results from split is returned.
+ *
+ * @param source byte array to be split.
+ * @param sizes sizes of relative components/segments.
+ * @return source split by this separator as per the sizes specified..
+ */
+ public byte[][] split(byte[] source, int[] sizes) {
+ return split(source, this.bytes, sizes);
}
/**
@@ -315,10 +353,158 @@ public enum Separator {
* as many times as splits are found. This will naturally produce copied byte
* arrays for each of the split segments.
*
- * @param source to be split
+ * @param source byte array to be split
* @return source split by this separator.
*/
public byte[][] split(byte[] source) {
- return TimelineStorageUtils.split(source, this.bytes);
+ return split(source, this.bytes);
+ }
+
+ /**
+ * Returns a list of ranges identifying [start, end) -- closed, open --
+ * positions within the source byte array that would be split using the
+ * separator byte array.
+ * The sizes indicate the sizes of the relative components/segments.
+ * In case one of the segments contains this separator before the specified
+ * size is reached, the separator will be considered part of that segment and
+ * we will continue till size is reached.
+ * Variable length strings cannot contain this separator and are indiced with
+ * a size of {@value #VARIABLE_SIZE}. Such strings are encoded for this
+ * separator and decoded after the results from split is returned.
+ *
+ * @param source the source data
+ * @param separator the separator pattern to look for
+ * @param sizes indicate the sizes of the relative components/segments.
+ * @return a list of ranges.
+ */
+ private static List<Range> splitRanges(byte[] source, byte[] separator,
+ int[] sizes) {
+ List<Range> segments = new ArrayList<Range>();
+ if (source == null || separator == null) {
+ return segments;
+ }
+ // VARIABLE_SIZE here indicates that there is no limit to number of segments
+ // to return.
+ int limit = VARIABLE_SIZE;
+ if (sizes != null && sizes.length > 0) {
+ limit = sizes.length;
+ }
+ int start = 0;
+ int currentSegment = 0;
+ itersource: for (int i = 0; i < source.length; i++) {
+ for (int j = 0; j < separator.length; j++) {
+ if (source[i + j] != separator[j]) {
+ continue itersource;
+ }
+ }
+ // all separator elements matched
+ if (limit > VARIABLE_SIZE) {
+ if (segments.size() >= (limit - 1)) {
+ // everything else goes in one final segment
+ break;
+ }
+ if (sizes != null) {
+ int currentSegExpectedSize = sizes[currentSegment];
+ if (currentSegExpectedSize > VARIABLE_SIZE) {
+ int currentSegSize = i - start;
+ if (currentSegSize < currentSegExpectedSize) {
+ // Segment not yet complete. More bytes to parse.
+ continue itersource;
+ } else if (currentSegSize > currentSegExpectedSize) {
+ // Segment is not as per size.
+ throw new IllegalArgumentException(
+ "Segments not separated as per expected sizes");
+ }
+ }
+ }
+ }
+ segments.add(new Range(start, i));
+ start = i + separator.length;
+ // i will be incremented again in outer for loop
+ i += separator.length - 1;
+ currentSegment++;
+ }
+ // add in remaining to a final range
+ if (start <= source.length) {
+ if (sizes != null) {
+ // Check if final segment is as per size specified.
+ if (sizes[currentSegment] > VARIABLE_SIZE &&
+ source.length - start > sizes[currentSegment]) {
+ // Segment is not as per size.
+ throw new IllegalArgumentException(
+ "Segments not separated as per expected sizes");
+ }
+ }
+ segments.add(new Range(start, source.length));
+ }
+ return segments;
+ }
+
+ /**
+ * Splits based on segments calculated based on limit/sizes specified for the
+ * separator.
+ *
+ * @param source byte array to be split.
+ * @param segments specifies the range for each segment.
+ * @return a byte[][] split as per the segment ranges.
+ */
+ private static byte[][] split(byte[] source, List<Range> segments) {
+ byte[][] splits = new byte[segments.size()][];
+ for (int i = 0; i < segments.size(); i++) {
+ Range r = segments.get(i);
+ byte[] tmp = new byte[r.length()];
+ if (tmp.length > 0) {
+ System.arraycopy(source, r.start(), tmp, 0, r.length());
+ }
+ splits[i] = tmp;
+ }
+ return splits;
+ }
+
+ /**
+ * Splits the source array into multiple array segments using the given
+ * separator based on the sizes. This will naturally produce copied byte
+ * arrays for each of the split segments.
+ *
+ * @param source source array.
+ * @param separator separator represented as a byte array.
+ * @param sizes sizes of relative components/segments.
+ * @return byte[][] after splitting the source.
+ */
+ private static byte[][] split(byte[] source, byte[] separator, int[] sizes) {
+ List<Range> segments = splitRanges(source, separator, sizes);
+ return split(source, segments);
+ }
+
+ /**
+ * Splits the source array into multiple array segments using the given
+ * separator. This will naturally produce copied byte arrays for each of the
+ * split segments.
+ *
+ * @param source Source array.
+ * @param separator Separator represented as a byte array.
+ * @return byte[][] after splitting the source.
+ */
+ private static byte[][] split(byte[] source, byte[] separator) {
+ return split(source, separator, (int[]) null);
+ }
+
+ /**
+ * Splits the source array into multiple array segments using the given
+ * separator, up to a maximum of count items. This will naturally produce
+ * copied byte arrays for each of the split segments.
+ *
+ * @param source Source array.
+ * @param separator Separator represented as a byte array.
+ * @param limit a non-positive value indicates no limit on number of segments.
+ * @return byte[][] after splitting the input source.
+ */
+ private static byte[][] split(byte[] source, byte[] separator, int limit) {
+ int[] sizes = null;
+ if (limit > VARIABLE_SIZE) {
+ sizes = new int[limit];
+ }
+ List<Range> segments = splitRanges(source, separator, sizes);
+ return split(source, segments);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ad33a07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java
new file mode 100644
index 0000000..b0f6d55
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+/**
+ * Encodes and decodes column names / row keys which are merely strings.
+ * Column prefixes are not part of the column name passed for encoding. It is
+ * added later, if required in the associated ColumnPrefix implementations.
+ */
+public final class StringKeyConverter implements KeyConverter<String> {
+ private static final StringKeyConverter INSTANCE = new StringKeyConverter();
+
+ public static StringKeyConverter getInstance() {
+ return INSTANCE;
+ }
+
+ private StringKeyConverter() {
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+ * #encode(java.lang.Object)
+ */
+ @Override
+ public byte[] encode(String key) {
+ return Separator.encode(key, Separator.SPACE, Separator.TAB);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+ * #decode(byte[])
+ */
+ @Override
+ public String decode(byte[] bytes) {
+ return Separator.decode(bytes, Separator.TAB, Separator.SPACE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ad33a07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
index 18f975a..d52a5d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -40,7 +39,6 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
@@ -48,18 +46,17 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter.TimelineFilterType;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
-import org.apache.hadoop.yarn.util.ConverterUtils;
/**
* A bunch of utility functions used across TimelineReader and TimelineWriter.
@@ -72,109 +69,10 @@ public final class TimelineStorageUtils {
private static final Log LOG = LogFactory.getLog(TimelineStorageUtils.class);
- /** empty bytes. */
- public static final byte[] EMPTY_BYTES = new byte[0];
-
- /** indicator for no limits for splitting. */
- public static final int NO_LIMIT_SPLIT = -1;
-
/** milliseconds in one day. */
public static final long MILLIS_ONE_DAY = 86400000L;
/**
- * Splits the source array into multiple array segments using the given
- * separator, up to a maximum of count items. This will naturally produce
- * copied byte arrays for each of the split segments. To identify the split
- * ranges without the array copies, see {@link #splitRanges(byte[], byte[])}.
- *
- * @param source Source array.
- * @param separator Separator represented as a byte array.
- * @return byte[][] after splitting the source
- */
- public static byte[][] split(byte[] source, byte[] separator) {
- return split(source, separator, NO_LIMIT_SPLIT);
- }
-
- /**
- * Splits the source array into multiple array segments using the given
- * separator, up to a maximum of count items. This will naturally produce
- * copied byte arrays for each of the split segments. To identify the split
- * ranges without the array copies, see {@link #splitRanges(byte[], byte[])}.
- *
- * @param source Source array.
- * @param separator Separator represented as a byte array.
- * @param limit a non-positive value indicates no limit on number of segments.
- * @return byte[][] after splitting the input source.
- */
- public static byte[][] split(byte[] source, byte[] separator, int limit) {
- List<Range> segments = splitRanges(source, separator, limit);
-
- byte[][] splits = new byte[segments.size()][];
- for (int i = 0; i < segments.size(); i++) {
- Range r = segments.get(i);
- byte[] tmp = new byte[r.length()];
- if (tmp.length > 0) {
- System.arraycopy(source, r.start(), tmp, 0, r.length());
- }
- splits[i] = tmp;
- }
- return splits;
- }
-
- /**
- * Returns a list of ranges identifying [start, end) -- closed, open --
- * positions within the source byte array that would be split using the
- * separator byte array.
- *
- * @param source Source array.
- * @param separator Separator represented as a byte array.
- * @return a list of ranges.
- */
- public static List<Range> splitRanges(byte[] source, byte[] separator) {
- return splitRanges(source, separator, NO_LIMIT_SPLIT);
- }
-
- /**
- * Returns a list of ranges identifying [start, end) -- closed, open --
- * positions within the source byte array that would be split using the
- * separator byte array.
- *
- * @param source the source data
- * @param separator the separator pattern to look for
- * @param limit the maximum number of splits to identify in the source
- * @return a list of ranges.
- */
- public static List<Range> splitRanges(byte[] source, byte[] separator,
- int limit) {
- List<Range> segments = new ArrayList<Range>();
- if ((source == null) || (separator == null)) {
- return segments;
- }
- int start = 0;
- itersource: for (int i = 0; i < source.length; i++) {
- for (int j = 0; j < separator.length; j++) {
- if (source[i + j] != separator[j]) {
- continue itersource;
- }
- }
- // all separator elements matched
- if (limit > 0 && segments.size() >= (limit - 1)) {
- // everything else goes in one final segment
- break;
- }
- segments.add(new Range(start, i));
- start = i + separator.length;
- // i will be incremented again in outer for loop
- i += separator.length - 1;
- }
- // add in remaining to a final range
- if (start <= source.length) {
- segments.add(new Range(start, source.length));
- }
- return segments;
- }
-
- /**
* Converts a timestamp into it's inverse timestamp to be used in (row) keys
* where we want to have the most recent timestamp in the top of the table
* (scans start at the most recent timestamp first).
@@ -200,53 +98,6 @@ public final class TimelineStorageUtils {
return Integer.MAX_VALUE - key;
}
-
- /**
- * Converts/encodes a string app Id into a byte representation for (row) keys.
- * For conversion, we extract cluster timestamp and sequence id from the
- * string app id (calls {@link ConverterUtils#toApplicationId(String)} for
- * conversion) and then store it in a byte array of length 12 (8 bytes (long)
- * for cluster timestamp followed 4 bytes(int) for sequence id). Both cluster
- * timestamp and sequence id are inverted so that the most recent cluster
- * timestamp and highest sequence id appears first in the table (i.e.
- * application id appears in a descending order).
- *
- * @param appIdStr application id in string format i.e.
- * application_{cluster timestamp}_{sequence id with min 4 digits}
- *
- * @return encoded byte representation of app id.
- */
- public static byte[] encodeAppId(String appIdStr) {
- ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
- byte[] appIdBytes = new byte[Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT];
- byte[] clusterTs = Bytes.toBytes(invertLong(appId.getClusterTimestamp()));
- System.arraycopy(clusterTs, 0, appIdBytes, 0, Bytes.SIZEOF_LONG);
- byte[] seqId = Bytes.toBytes(invertInt(appId.getId()));
- System.arraycopy(seqId, 0, appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT);
- return appIdBytes;
- }
-
- /**
- * Converts/decodes a 12 byte representation of app id for (row) keys to an
- * app id in string format which can be returned back to client.
- * For decoding, 12 bytes are interpreted as 8 bytes of inverted cluster
- * timestamp(long) followed by 4 bytes of inverted sequence id(int). Calls
- * {@link ApplicationId#toString} to generate string representation of app id.
- *
- * @param appIdBytes application id in byte representation.
- *
- * @return decoded app id in string format.
- */
- public static String decodeAppId(byte[] appIdBytes) {
- if (appIdBytes.length != (Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT)) {
- throw new IllegalArgumentException("Invalid app id in byte format");
- }
- long clusterTs = invertLong(Bytes.toLong(appIdBytes, 0, Bytes.SIZEOF_LONG));
- int seqId =
- invertInt(Bytes.toInt(appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT));
- return ApplicationId.newInstance(clusterTs, seqId).toString();
- }
-
/**
* returns the timestamp of that day's start (which is midnight 00:00:00 AM)
* for a given input timestamp.
@@ -810,7 +661,8 @@ public final class TimelineStorageUtils {
TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
boolean isRelatedTo) throws IOException {
// isRelatedTo and relatesTo are of type Map<String, Set<String>>
- Map<String, Object> columns = prefix.readResults(result);
+ Map<String, Object> columns =
+ prefix.readResults(result, StringKeyConverter.getInstance());
for (Map.Entry<String, Object> column : columns.entrySet()) {
for (String id : Separator.VALUES.splitEncoded(
column.getValue().toString())) {
@@ -837,7 +689,8 @@ public final class TimelineStorageUtils {
TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
boolean isConfig) throws IOException {
// info and configuration are of type Map<String, Object or String>
- Map<String, Object> columns = prefix.readResults(result);
+ Map<String, Object> columns =
+ prefix.readResults(result, StringKeyConverter.getInstance());
if (isConfig) {
for (Map.Entry<String, Object> column : columns.entrySet()) {
entity.addConfig(column.getKey(), column.getValue().toString());
@@ -861,30 +714,24 @@ public final class TimelineStorageUtils {
public static <T> void readEvents(TimelineEntity entity, Result result,
ColumnPrefix<T> prefix) throws IOException {
Map<String, TimelineEvent> eventsMap = new HashMap<>();
- Map<?, Object> eventsResult =
- prefix.readResultsHavingCompoundColumnQualifiers(result);
- for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
- byte[][] karr = (byte[][])eventResult.getKey();
- // the column name is of the form "eventId=timestamp=infoKey"
- if (karr.length == 3) {
- String id = Bytes.toString(karr[0]);
- long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1]));
- String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
- TimelineEvent event = eventsMap.get(key);
- if (event == null) {
- event = new TimelineEvent();
- event.setId(id);
- event.setTimestamp(ts);
- eventsMap.put(key, event);
- }
- // handle empty info
- String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
- if (infoKey != null) {
- event.addInfo(infoKey, eventResult.getValue());
- }
- } else {
- LOG.warn("incorrectly formatted column name: it will be discarded");
- continue;
+ Map<EventColumnName, Object> eventsResult =
+ prefix.readResults(result, EventColumnNameConverter.getInstance());
+ for (Map.Entry<EventColumnName, Object>
+ eventResult : eventsResult.entrySet()) {
+ EventColumnName eventColumnName = eventResult.getKey();
+ String key = eventColumnName.getId() +
+ Long.toString(eventColumnName.getTimestamp());
+ // Retrieve previously seen event to add to it
+ TimelineEvent event = eventsMap.get(key);
+ if (event == null) {
+ // First time we're seeing this event, add it to the eventsMap
+ event = new TimelineEvent();
+ event.setId(eventColumnName.getId());
+ event.setTimestamp(eventColumnName.getTimestamp());
+ eventsMap.put(key, event);
+ }
+ if (eventColumnName.getInfoKey() != null) {
+ event.addInfo(eventColumnName.getInfoKey(), eventResult.getValue());
}
}
Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ad33a07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
index de2b29d..02a4bb3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
@@ -27,9 +27,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
@@ -78,7 +79,6 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
*/
private final String columnPrefix;
private final byte[] columnPrefixBytes;
- private final boolean compoundColQual;
/**
* Private constructor, meant to be used by the enum definition.
@@ -122,7 +122,6 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
this.columnPrefixBytes =
Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
}
- this.compoundColQual = compondColQual;
}
/**
@@ -154,14 +153,6 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
return column.getValueConverter();
}
- public byte[] getCompoundColQualBytes(String qualifier,
- byte[]...components) {
- if (!compoundColQual) {
- return ColumnHelper.getColumnQualifier(null, qualifier);
- }
- return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components);
- }
-
/*
* (non-Javadoc)
*
@@ -233,26 +224,12 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
- * #readResults(org.apache.hadoop.hbase.client.Result)
+ * #readResults(org.apache.hadoop.hbase.client.Result,
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
*/
- public Map<String, Object> readResults(Result result) throws IOException {
- return column.readResults(result, columnPrefixBytes);
- }
-
- /**
- * @param result from which to read columns
- * @return the latest values of columns in the column family. The column
- * qualifier is returned as a list of parts, each part a byte[]. This
- * is to facilitate returning byte arrays of values that were not
- * Strings. If they can be treated as Strings, you should use
- * {@link #readResults(Result)} instead.
- * @throws IOException if there is any exception encountered while reading
- * result.
- */
- public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
- throws IOException {
- return column.readResultsHavingCompoundColumnQualifiers(result,
- columnPrefixBytes);
+ public <K> Map<K, Object> readResults(Result result,
+ KeyConverter<K> keyConverter) throws IOException {
+ return column.readResults(result, columnPrefixBytes, keyConverter);
}
/*
@@ -260,11 +237,14 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
- * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
+ * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result,
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
*/
- public <V> NavigableMap<String, NavigableMap<Long, V>>
- readResultsWithTimestamps(Result result) throws IOException {
- return column.readResultsWithTimestamps(result, columnPrefixBytes);
+ public <K, V> NavigableMap<K, NavigableMap<Long, V>>
+ readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter)
+ throws IOException {
+ return column.readResultsWithTimestamps(result, columnPrefixBytes,
+ keyConverter);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ad33a07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
index 04c633c..6d08390 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
@@ -17,10 +17,6 @@
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-
/**
* Represents a rowkey for the entity table.
*/
@@ -28,13 +24,13 @@ public class EntityRowKey {
private final String clusterId;
private final String userId;
private final String flowName;
- private final long flowRunId;
+ private final Long flowRunId;
private final String appId;
private final String entityType;
private final String entityId;
public EntityRowKey(String clusterId, String userId, String flowName,
- long flowRunId, String appId, String entityType, String entityId) {
+ Long flowRunId, String appId, String entityType, String entityId) {
this.clusterId = clusterId;
this.userId = userId;
this.flowName = flowName;
@@ -56,7 +52,7 @@ public class EntityRowKey {
return flowName;
}
- public long getFlowRunId() {
+ public Long getFlowRunId() {
return flowRunId;
}
@@ -85,14 +81,8 @@ public class EntityRowKey {
*/
public static byte[] getRowKeyPrefix(String clusterId, String userId,
String flowName, Long flowRunId, String appId) {
- byte[] first =
- Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
- flowName));
- // Note that flowRunId is a long, so we can't encode them all at the same
- // time.
- byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
- byte[] third = TimelineStorageUtils.encodeAppId(appId);
- return Separator.QUALIFIERS.join(first, second, third, new byte[0]);
+ return EntityRowKeyConverter.getInstance().encode(new EntityRowKey(
+ clusterId, userId, flowName, flowRunId, appId, null, null));
}
/**
@@ -111,16 +101,8 @@ public class EntityRowKey {
*/
public static byte[] getRowKeyPrefix(String clusterId, String userId,
String flowName, Long flowRunId, String appId, String entityType) {
- byte[] first =
- Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
- flowName));
- // Note that flowRunId is a long, so we can't encode them all at the same
- // time.
- byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
- byte[] third = TimelineStorageUtils.encodeAppId(appId);
- byte[] fourth =
- Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(entityType, ""));
- return Separator.QUALIFIERS.join(first, second, third, fourth);
+ return EntityRowKeyConverter.getInstance().encode(new EntityRowKey(
+ clusterId, userId, flowName, flowRunId, appId, entityType, null));
}
/**
@@ -140,16 +122,8 @@ public class EntityRowKey {
public static byte[] getRowKey(String clusterId, String userId,
String flowName, Long flowRunId, String appId, String entityType,
String entityId) {
- byte[] first =
- Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
- flowName));
- // Note that flowRunId is a long, so we can't encode them all at the same
- // time.
- byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
- byte[] third = TimelineStorageUtils.encodeAppId(appId);
- byte[] fourth =
- Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(entityType, entityId));
- return Separator.QUALIFIERS.join(first, second, third, fourth);
+ return EntityRowKeyConverter.getInstance().encode(new EntityRowKey(
+ clusterId, userId, flowName, flowRunId, appId, entityType, entityId));
}
/**
@@ -159,27 +133,6 @@ public class EntityRowKey {
* @return An <cite>EntityRowKey</cite> object.
*/
public static EntityRowKey parseRowKey(byte[] rowKey) {
- byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
-
- if (rowKeyComponents.length < 7) {
- throw new IllegalArgumentException("the row key is not valid for " +
- "an entity");
- }
-
- String userId =
- Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
- String clusterId =
- Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
- String flowName =
- Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
- long flowRunId =
- TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
- String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[4]);
- String entityType =
- Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[5]));
- String entityId =
- Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[6]));
- return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
- entityType, entityId);
+ return EntityRowKeyConverter.getInstance().decode(rowKey);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ad33a07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java
new file mode 100644
index 0000000..43c0569
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+
+/**
+ * Encodes and decodes row key for entity table.
+ * The row key is of the form :
+ * userName!clusterId!flowName!flowRunId!appId!entityType!entityId.
+ * flowRunId is a long, appId is encoded/decoded using
+ * {@link AppIdKeyConverter} and rest are strings.
+ */
+public final class EntityRowKeyConverter implements KeyConverter<EntityRowKey> {
+ private static final EntityRowKeyConverter INSTANCE =
+ new EntityRowKeyConverter();
+
+ public static EntityRowKeyConverter getInstance() {
+ return INSTANCE;
+ }
+
+ private EntityRowKeyConverter() {
+ }
+
+ // Entity row key is of the form
+ // userName!clusterId!flowName!flowRunId!appId!entityType!entityId with each
+ // segment separated by !. The sizes below indicate sizes of each one of these
+ // segements in sequence. clusterId, userName, flowName, entityType and
+ // entityId are strings. flowrunId is a long hence 8 bytes in size. app id is
+ // represented as 12 bytes with cluster timestamp part of appid being 8 bytes
+ // (long) and seq id being 4 bytes(int).
+ // Strings are variable in size (i.e. end whenever separator is encountered).
+ // This is used while decoding and helps in determining where to split.
+ private static final int[] SEGMENT_SIZES = {
+ Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+ Bytes.SIZEOF_LONG, AppIdKeyConverter.getKeySize(),
+ Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE };
+
+ /*
+ * (non-Javadoc)
+ *
+ * Encodes EntityRowKey object into a byte array with each component/field in
+ * EntityRowKey separated by Separator#QUALIFIERS. This leads to an entity
+ * table row key of the form
+ * userName!clusterId!flowName!flowRunId!appId!entityType!entityId
+ * If entityType in passed EntityRowKey object is null (and the fields
+ * preceding it i.e. clusterId, userId and flowName, flowRunId and appId are
+ * not null), this returns a row key prefix of the form
+ * userName!clusterId!flowName!flowRunId!appId! and if entityId in
+ * EntityRowKey is null (other 6 components are not null), this returns a row
+ * key prefix of the form
+ * userName!clusterId!flowName!flowRunId!appId!entityType!
+ * flowRunId is inverted while encoding as it helps maintain a descending
+ * order for row keys in entity table.
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+ * #encode(java.lang.Object)
+ */
+ @Override
+ public byte[] encode(EntityRowKey rowKey) {
+ byte[] user = Separator.encode(rowKey.getUserId(),
+ Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
+ byte[] cluster = Separator.encode(rowKey.getClusterId(),
+ Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
+ byte[] flow = Separator.encode(rowKey.getFlowName(),
+ Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
+ byte[] first = Separator.QUALIFIERS.join(user, cluster, flow);
+ // Note that flowRunId is a long, so we can't encode them all at the same
+ // time.
+ byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(
+ rowKey.getFlowRunId()));
+ byte[] third = AppIdKeyConverter.getInstance().encode(rowKey.getAppId());
+ if (rowKey.getEntityType() == null) {
+ return Separator.QUALIFIERS.join(
+ first, second, third, Separator.EMPTY_BYTES);
+ }
+ byte[] entityType = Separator.encode(rowKey.getEntityType(),
+ Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
+ byte[] entityId = rowKey.getEntityId() == null ? Separator.EMPTY_BYTES :
+ Separator.encode(rowKey.getEntityId(), Separator.SPACE, Separator.TAB,
+ Separator.QUALIFIERS);
+ byte[] fourth = Separator.QUALIFIERS.join(entityType, entityId);
+ return Separator.QUALIFIERS.join(first, second, third, fourth);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * Decodes an application row key of the form
+ * userName!clusterId!flowName!flowRunId!appId!entityType!entityId represented
+ * in byte format and converts it into an EntityRowKey object. flowRunId is
+ * inverted while decoding as it was inverted while encoding.
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+ * #decode(byte[])
+ */
+ @Override
+ public EntityRowKey decode(byte[] rowKey) {
+ byte[][] rowKeyComponents =
+ Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
+ if (rowKeyComponents.length != 7) {
+ throw new IllegalArgumentException("the row key is not valid for " +
+ "an entity");
+ }
+ String userId = Separator.decode(Bytes.toString(rowKeyComponents[0]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[1]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ String flowName = Separator.decode(Bytes.toString(rowKeyComponents[2]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ Long flowRunId =
+ TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
+ String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[4]);
+ String entityType = Separator.decode(Bytes.toString(rowKeyComponents[5]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ String entityId =Separator.decode(Bytes.toString(rowKeyComponents[6]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
+ entityType, entityId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ad33a07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
index 188c2fe..71c3d90 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
@@ -51,7 +52,6 @@ public enum FlowActivityColumnPrefix
*/
private final String columnPrefix;
private final byte[] columnPrefixBytes;
- private final boolean compoundColQual;
private final AggregationOperation aggOp;
@@ -83,7 +83,6 @@ public enum FlowActivityColumnPrefix
.encode(columnPrefix));
}
this.aggOp = aggOp;
- this.compoundColQual = compoundColQual;
}
/**
@@ -169,10 +168,12 @@ public enum FlowActivityColumnPrefix
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
- * #readResults(org.apache.hadoop.hbase.client.Result)
+ * #readResults(org.apache.hadoop.hbase.client.Result,
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
*/
- public Map<String, Object> readResults(Result result) throws IOException {
- return column.readResults(result, columnPrefixBytes);
+ public <K> Map<K, Object> readResults(Result result,
+ KeyConverter<K> keyConverter) throws IOException {
+ return column.readResults(result, columnPrefixBytes, keyConverter);
}
/*
@@ -180,11 +181,14 @@ public enum FlowActivityColumnPrefix
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
- * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
+ * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result,
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
*/
- public <T> NavigableMap<String, NavigableMap<Long, T>>
- readResultsWithTimestamps(Result result) throws IOException {
- return column.readResultsWithTimestamps(result, columnPrefixBytes);
+ public <K, V> NavigableMap<K, NavigableMap<Long, V>>
+ readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter)
+ throws IOException {
+ return column.readResultsWithTimestamps(result, columnPrefixBytes,
+ keyConverter);
}
/**
@@ -270,20 +274,4 @@ public enum FlowActivityColumnPrefix
column.store(rowKey, tableMutator, columnQualifier, null, inputValue,
combinedAttributes);
}
-
- @Override
- public byte[] getCompoundColQualBytes(String qualifier,
- byte[]...components) {
- if (!compoundColQual) {
- return ColumnHelper.getColumnQualifier(null, qualifier);
- }
- return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components);
- }
-
- @Override
- public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
- throws IOException {
- // There are no compound column qualifiers for flow activity table.
- return null;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ad33a07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
index 2726ae2..eea38a5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
/**
@@ -27,11 +25,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStor
public class FlowActivityRowKey {
private final String clusterId;
- private final long dayTs;
+ private final Long dayTs;
private final String userId;
private final String flowName;
- public FlowActivityRowKey(String clusterId, long dayTs, String userId,
+ public FlowActivityRowKey(String clusterId, Long dayTs, String userId,
String flowName) {
this.clusterId = clusterId;
this.dayTs = dayTs;
@@ -43,7 +41,7 @@ public class FlowActivityRowKey {
return clusterId;
}
- public long getDayTimestamp() {
+ public Long getDayTimestamp() {
return dayTs;
}
@@ -63,7 +61,8 @@ public class FlowActivityRowKey {
* @return byte array with the row key prefix
*/
public static byte[] getRowKeyPrefix(String clusterId) {
- return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, ""));
+ return FlowActivityRowKeyConverter.getInstance().encode(
+ new FlowActivityRowKey(clusterId, null, null, null));
}
/**
@@ -75,9 +74,8 @@ public class FlowActivityRowKey {
* @return byte array with the row key prefix
*/
public static byte[] getRowKeyPrefix(String clusterId, long dayTs) {
- return Separator.QUALIFIERS.join(
- Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)),
- Bytes.toBytes(TimelineStorageUtils.invertLong(dayTs)), new byte[0]);
+ return FlowActivityRowKeyConverter.getInstance().encode(
+ new FlowActivityRowKey(clusterId, dayTs, null, null));
}
/**
@@ -94,12 +92,8 @@ public class FlowActivityRowKey {
String flowName) {
// convert it to Day's time stamp
eventTs = TimelineStorageUtils.getTopOfTheDayTimestamp(eventTs);
-
- return Separator.QUALIFIERS.join(
- Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)),
- Bytes.toBytes(TimelineStorageUtils.invertLong(eventTs)),
- Bytes.toBytes(Separator.QUALIFIERS.encode(userId)),
- Bytes.toBytes(Separator.QUALIFIERS.encode(flowName)));
+ return FlowActivityRowKeyConverter.getInstance().encode(
+ new FlowActivityRowKey(clusterId, eventTs, userId, flowName));
}
/**
@@ -109,21 +103,6 @@ public class FlowActivityRowKey {
* @return A <cite>FlowActivityRowKey</cite> object.
*/
public static FlowActivityRowKey parseRowKey(byte[] rowKey) {
- byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
-
- if (rowKeyComponents.length < 4) {
- throw new IllegalArgumentException("the row key is not valid for "
- + "a flow activity");
- }
-
- String clusterId = Separator.QUALIFIERS.decode(Bytes
- .toString(rowKeyComponents[0]));
- long dayTs =
- TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[1]));
- String userId = Separator.QUALIFIERS.decode(Bytes
- .toString(rowKeyComponents[2]));
- String flowName = Separator.QUALIFIERS.decode(Bytes
- .toString(rowKeyComponents[3]));
- return new FlowActivityRowKey(clusterId, dayTs, userId, flowName);
+ return FlowActivityRowKeyConverter.getInstance().decode(rowKey);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ad33a07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java
new file mode 100644
index 0000000..9dc4c98
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+
+/**
+ * Encodes and decodes row key for flow activity table.
+ * The row key is of the form : clusterId!dayTimestamp!user!flowName.
+ * dayTimestamp(top of the day timestamp) is a long and rest are strings.
+ */
+public final class FlowActivityRowKeyConverter implements
+ KeyConverter<FlowActivityRowKey> {
+ private static final FlowActivityRowKeyConverter INSTANCE =
+ new FlowActivityRowKeyConverter();
+
+ public static FlowActivityRowKeyConverter getInstance() {
+ return INSTANCE;
+ }
+
+ private FlowActivityRowKeyConverter() {
+ }
+
+ // Flow activity row key is of the form clusterId!dayTimestamp!user!flowName
+ // with each segment separated by !. The sizes below indicate sizes of each
+ // one of these segements in sequence. clusterId, user and flowName are
+ // strings. Top of the day timestamp is a long hence 8 bytes in size.
+ // Strings are variable in size (i.e. end whenever separator is encountered).
+ // This is used while decoding and helps in determining where to split.
+ private static final int[] SEGMENT_SIZES = {
+ Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE,
+ Separator.VARIABLE_SIZE };
+
+ /*
+ * (non-Javadoc)
+ *
+ * Encodes FlowActivityRowKey object into a byte array with each
+ * component/field in FlowActivityRowKey separated by Separator#QUALIFIERS.
+ * This leads to an flow activity table row key of the form
+ * clusterId!dayTimestamp!user!flowName
+ * If dayTimestamp in passed FlowActivityRowKey object is null and clusterId
+ * is not null, this returns a row key prefix as clusterId! and if userId in
+ * FlowActivityRowKey is null (and the fields preceding it i.e. clusterId and
+ * dayTimestamp are not null), this returns a row key prefix as
+ * clusterId!dayTimeStamp!
+ * dayTimestamp is inverted while encoding as it helps maintain a descending
+ * order for row keys in flow activity table.
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+ * #encode(java.lang.Object)
+ */
+
+ @Override
+ public byte[] encode(FlowActivityRowKey rowKey) {
+ if (rowKey.getDayTimestamp() == null) {
+ return Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(),
+ Separator.SPACE, Separator.TAB, Separator.QUALIFIERS),
+ Separator.EMPTY_BYTES);
+ }
+ if (rowKey.getUserId() == null) {
+ return Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(),
+ Separator.SPACE, Separator.TAB, Separator.QUALIFIERS),
+ Bytes.toBytes(TimelineStorageUtils.invertLong(
+ rowKey.getDayTimestamp())), Separator.EMPTY_BYTES);
+ }
+ return Separator.QUALIFIERS.join(
+ Separator.encode(rowKey.getClusterId(), Separator.SPACE, Separator.TAB,
+ Separator.QUALIFIERS),
+ Bytes.toBytes(
+ TimelineStorageUtils.invertLong(rowKey.getDayTimestamp())),
+ Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
+ Separator.QUALIFIERS),
+ Separator.encode(rowKey.getFlowName(), Separator.SPACE, Separator.TAB,
+ Separator.QUALIFIERS));
+ }
+
+ @Override
+ public FlowActivityRowKey decode(byte[] rowKey) {
+ byte[][] rowKeyComponents =
+ Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
+ if (rowKeyComponents.length != 4) {
+ throw new IllegalArgumentException("the row key is not valid for "
+ + "a flow activity");
+ }
+ String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ Long dayTs =
+ TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[1]));
+ String userId = Separator.decode(Bytes.toString(rowKeyComponents[2]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ String flowName = Separator.decode(Bytes.toString(rowKeyComponents[3]),
+ Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+ return new FlowActivityRowKey(clusterId, dayTs, userId, flowName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ad33a07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
index 77f2ab2..0f14c89 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
@@ -26,10 +26,11 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
/**
@@ -40,8 +41,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
/**
* To store flow run info values.
*/
- METRIC(FlowRunColumnFamily.INFO, "m", null,
- LongConverter.getInstance());
+ METRIC(FlowRunColumnFamily.INFO, "m", null, LongConverter.getInstance());
private final ColumnHelper<FlowRunTable> column;
private final ColumnFamily<FlowRunTable> columnFamily;
@@ -52,17 +52,14 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
*/
private final String columnPrefix;
private final byte[] columnPrefixBytes;
- private final boolean compoundColQual;
private final AggregationOperation aggOp;
/**
* Private constructor, meant to be used by the enum definition.
*
- * @param columnFamily
- * that this column is stored in.
- * @param columnPrefix
- * for this column.
+ * @param columnFamily that this column is stored in.
+ * @param columnPrefix for this column.
*/
private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily,
String columnPrefix, AggregationOperation fra, ValueConverter converter) {
@@ -79,11 +76,10 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
this.columnPrefixBytes = null;
} else {
// Future-proof by ensuring the right column prefix hygiene.
- this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE
- .encode(columnPrefix));
+ this.columnPrefixBytes =
+ Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
}
this.aggOp = fra;
- this.compoundColQual = compoundColQual;
}
/**
@@ -99,14 +95,14 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
@Override
public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
- return ColumnHelper.getColumnQualifier(
- this.columnPrefixBytes, qualifierPrefix);
+ return ColumnHelper.getColumnQualifier(this.columnPrefixBytes,
+ qualifierPrefix);
}
@Override
public byte[] getColumnPrefixBytes(String qualifierPrefix) {
- return ColumnHelper.getColumnQualifier(
- this.columnPrefixBytes, qualifierPrefix);
+ return ColumnHelper.getColumnQualifier(this.columnPrefixBytes,
+ qualifierPrefix);
}
@Override
@@ -139,8 +135,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
}
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
- Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
- attributes, this.aggOp);
+ Attribute[] combinedAttributes =
+ TimelineStorageUtils.combineAttributes(attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
combinedAttributes);
}
@@ -166,8 +162,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
}
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
- Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
- attributes, this.aggOp);
+ Attribute[] combinedAttributes =
+ TimelineStorageUtils.combineAttributes(attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
combinedAttributes);
}
@@ -180,8 +176,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
* #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
*/
public Object readResult(Result result, String qualifier) throws IOException {
- byte[] columnQualifier = ColumnHelper.getColumnQualifier(
- this.columnPrefixBytes, qualifier);
+ byte[] columnQualifier =
+ ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
return column.readResult(result, columnQualifier);
}
@@ -190,10 +186,12 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
- * #readResults(org.apache.hadoop.hbase.client.Result)
+ * #readResults(org.apache.hadoop.hbase.client.Result,
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
*/
- public Map<String, Object> readResults(Result result) throws IOException {
- return column.readResults(result, columnPrefixBytes);
+ public <K> Map<K, Object> readResults(Result result,
+ KeyConverter<K> keyConverter) throws IOException {
+ return column.readResults(result, columnPrefixBytes, keyConverter);
}
/*
@@ -201,11 +199,14 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
- * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
+ * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result,
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
*/
- public <T> NavigableMap<String, NavigableMap<Long, T>>
- readResultsWithTimestamps(Result result) throws IOException {
- return column.readResultsWithTimestamps(result, columnPrefixBytes);
+ public <K, V> NavigableMap<K, NavigableMap<Long, V>>
+ readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter)
+ throws IOException {
+ return column.readResultsWithTimestamps(result, columnPrefixBytes,
+ keyConverter);
}
/**
@@ -213,8 +214,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
* no match. The following holds true: {@code columnFor(x) == columnFor(y)} if
* and only if {@code x.equals(y)} or {@code (x == y == null)}
*
- * @param columnPrefix
- * Name of the column to retrieve
+ * @param columnPrefix Name of the column to retrieve
* @return the corresponding {@link FlowRunColumnPrefix} or null
*/
public static final FlowRunColumnPrefix columnFor(String columnPrefix) {
@@ -242,10 +242,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
* {@code columnFor(a,x) == columnFor(b,y)} if and only if
* {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)}
*
- * @param columnFamily
- * The columnFamily for which to retrieve the column.
- * @param columnPrefix
- * Name of the column to retrieve
+ * @param columnFamily The columnFamily for which to retrieve the column.
+ * @param columnPrefix Name of the column to retrieve
* @return the corresponding {@link FlowRunColumnPrefix} or null if both
* arguments don't match.
*/
@@ -267,20 +265,4 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
// Default to null
return null;
}
-
- @Override
- public byte[] getCompoundColQualBytes(String qualifier,
- byte[]...components) {
- if (!compoundColQual) {
- return ColumnHelper.getColumnQualifier(null, qualifier);
- }
- return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components);
- }
-
- @Override
- public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
- throws IOException {
- // There are no compound column qualifiers for flow run table.
- return null;
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ad33a07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
index eac8f05..925242b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
@@ -17,10 +17,6 @@
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-
/**
* Represents a rowkey for the flow run table.
*/
@@ -28,10 +24,10 @@ public class FlowRunRowKey {
private final String clusterId;
private final String userId;
private final String flowName;
- private final long flowRunId;
+ private final Long flowRunId;
public FlowRunRowKey(String clusterId, String userId, String flowName,
- long flowRunId) {
+ Long flowRunId) {
this.clusterId = clusterId;
this.userId = userId;
this.flowName = flowName;
@@ -50,7 +46,7 @@ public class FlowRunRowKey {
return flowName;
}
- public long getFlowRunId() {
+ public Long getFlowRunId() {
return flowRunId;
}
@@ -65,13 +61,13 @@ public class FlowRunRowKey {
*/
public static byte[] getRowKeyPrefix(String clusterId, String userId,
String flowName) {
- return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, userId,
- flowName, ""));
+ return FlowRunRowKeyConverter.getInstance().encode(new FlowRunRowKey(
+ clusterId, userId, flowName, null));
}
/**
* Constructs a row key for the entity table as follows: {
- * clusterId!userI!flowName!Inverted Flow Run Id}.
+ * clusterId!userId!flowName!Inverted Flow Run Id}.
*
* @param clusterId Cluster Id.
* @param userId User Id.
@@ -81,12 +77,8 @@ public class FlowRunRowKey {
*/
public static byte[] getRowKey(String clusterId, String userId,
String flowName, Long flowRunId) {
- byte[] first = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId,
- userId, flowName));
- // Note that flowRunId is a long, so we can't encode them all at the same
- // time.
- byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
- return Separator.QUALIFIERS.join(first, second);
+ return FlowRunRowKeyConverter.getInstance().encode(new FlowRunRowKey(
+ clusterId, userId, flowName, flowRunId));
}
/**
@@ -96,22 +88,7 @@ public class FlowRunRowKey {
* @return A <cite>FlowRunRowKey</cite> object.
*/
public static FlowRunRowKey parseRowKey(byte[] rowKey) {
- byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
-
- if (rowKeyComponents.length < 4) {
- throw new IllegalArgumentException("the row key is not valid for " +
- "a flow run");
- }
-
- String clusterId =
- Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
- String userId =
- Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
- String flowName =
- Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
- long flowRunId =
- TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
- return new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
+ return FlowRunRowKeyConverter.getInstance().decode(rowKey);
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org