You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2015/06/18 19:49:36 UTC
[1/2] hadoop git commit: YARN-3706. Generalize native HBase writer
for additional tables (Joep Rottinghuis via sjlee)
Repository: hadoop
Updated Branches:
refs/heads/YARN-2928 a1bb9137a -> 9137aeae0
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
new file mode 100644
index 0000000..ee57890
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Used to separate row qualifiers, column qualifiers and compount fields.
+ */
+public enum Separator {
+
+ /**
+ * separator in key or column qualifier fields
+ */
+ QUALIFIERS("!", "%0$"),
+
+ /**
+ * separator in values, and/or compound key/column qualifier fields.
+ */
+ VALUES("?", "%1$"),
+
+ /**
+ * separator in values, often used to avoid having these in qualifiers and
+ * names. Note that if we use HTML form encoding through URLEncoder, we end up
+ * getting a + for a space, which may already occur in strings, so we don't
+ * want that.
+ */
+ SPACE(" ", "%2$");
+
+ /**
+ * The string value of this separator.
+ */
+ private final String value;
+
+ /**
+ * The URLEncoded version of this separator
+ */
+ private final String encodedValue;
+
+ /**
+ * The bye representation of value.
+ */
+ private final byte[] bytes;
+
+ /**
+ * The value quoted so that it can be used as a safe regex
+ */
+ private final String quotedValue;
+
+ private static final byte[] EMPTY_BYTES = new byte[0];
+
+ /**
+ * @param value of the separator to use. Cannot be null or empty string.
+ * @param encodedValue choose something that isn't likely to occur in the data
+ * itself. Cannot be null or empty string.
+ */
+ private Separator(String value, String encodedValue) {
+ this.value = value;
+ this.encodedValue = encodedValue;
+
+ // validation
+ if (value == null || value.length() == 0 || encodedValue == null
+ || encodedValue.length() == 0) {
+ throw new IllegalArgumentException(
+ "Cannot create separator from null or empty string.");
+ }
+
+ this.bytes = Bytes.toBytes(value);
+ this.quotedValue = Pattern.quote(value);
+ }
+
+ /**
+ * Used to make token safe to be used with this separator without collisions.
+ *
+ * @param token
+ * @return the token with any occurrences of this separator URLEncoded.
+ */
+ public String encode(String token) {
+ if (token == null || token.length() == 0) {
+ // Nothing to replace
+ return token;
+ }
+ return token.replace(value, encodedValue);
+ }
+
+ /**
+ * @param token
+ * @return the token with any occurrences of the encoded separator replaced by
+ * the separator itself.
+ */
+ public String decode(String token) {
+ if (token == null || token.length() == 0) {
+ // Nothing to replace
+ return token;
+ }
+ return token.replace(encodedValue, value);
+ }
+
+ /**
+ * Encode the given separators in the token with their encoding equivalent.
+ * This means that when encoding is already present in the token itself, this
+ * is not a reversible process. See also {@link #decode(String, Separator...)}
+ *
+ * @param token containing possible separators that need to be encoded.
+ * @param separators to be encoded in the token with their URLEncoding
+ * equivalent.
+ * @return non-null byte representation of the token with occurrences of the
+ * separators encoded.
+ */
+ public static byte[] encode(String token, Separator... separators) {
+ if (token == null) {
+ return EMPTY_BYTES;
+ }
+ String result = token;
+ for (Separator separator : separators) {
+ if (separator != null) {
+ result = separator.encode(result);
+ }
+ }
+ return Bytes.toBytes(result);
+ }
+
+ /**
+ * Decode the given separators in the token with their decoding equivalent.
+ * This means that when encoding is already present in the token itself, this
+ * is not a reversible process.
+ *
+ * @param token containing possible separators that need to be encoded.
+ * @param separators to be encoded in the token with their URLEncoding
+ * equivalent.
+ * @return String representation of the token with occurrences of the URL
+ * encoded separators decoded.
+ */
+ public static String decode(byte[] token, Separator... separators) {
+ if (token == null) {
+ return null;
+ }
+ return decode(Bytes.toString(token), separators);
+ }
+
+ /**
+ * Decode the given separators in the token with their decoding equivalent.
+ * This means that when encoding is already present in the token itself, this
+ * is not a reversible process.
+ *
+ * @param token containing possible separators that need to be encoded.
+ * @param separators to be encoded in the token with their URLEncoding
+ * equivalent.
+ * @return String representation of the token with occurrences of the URL
+ * encoded separators decoded.
+ */
+ public static String decode(String token, Separator... separators) {
+ if (token == null) {
+ return null;
+ }
+ String result = token;
+ for (Separator separator : separators) {
+ if (separator != null) {
+ result = separator.decode(result);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Returns a single byte array containing all of the individual arrays
+ * components separated by this separator.
+ *
+ * @param components
+ * @return byte array after joining the components
+ */
+ public byte[] join(byte[]... components) {
+ if (components == null || components.length == 0) {
+ return EMPTY_BYTES;
+ }
+
+ int finalSize = 0;
+ finalSize = this.value.length() * (components.length - 1);
+ for (byte[] comp : components) {
+ if (comp != null) {
+ finalSize += comp.length;
+ }
+ }
+
+ byte[] buf = new byte[finalSize];
+ int offset = 0;
+ for (int i = 0; i < components.length; i++) {
+ if (components[i] != null) {
+ System.arraycopy(components[i], 0, buf, offset, components[i].length);
+ offset += components[i].length;
+ }
+ if (i < (components.length - 1)) {
+ System.arraycopy(this.bytes, 0, buf, offset, this.value.length());
+ offset += this.value.length();
+ }
+
+ }
+ return buf;
+ }
+
+ /**
+ * Concatenates items (as String), using this separator.
+ *
+ * @param items Items join, {@code toString()} will be called in each item.
+ * Any occurrence of the separator in the individual strings will be
+ * first encoded. Cannot be null.
+ * @return non-null joined result. Note that when separator is {@literal null}
+ * the result is simply all items concatenated and the process is not
+ * reversible through {@link #splitEncoded(String)}
+ */
+ public String joinEncoded(String... items) {
+ if (items == null || items.length == 0) {
+ return "";
+ }
+
+ StringBuilder sb = new StringBuilder(encode(items[0].toString()));
+ // Start at 1, we've already grabbed the first value at index 0
+ for (int i = 1; i < items.length; i++) {
+ sb.append(this.value);
+ sb.append(encode(items[i].toString()));
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * Concatenates items (as String), using this separator.
+ *
+ * @param items Items join, {@code toString()} will be called in each item.
+ * Any occurrence of the separator in the individual strings will be
+ * first encoded. Cannot be null.
+ * @return non-null joined result. Note that when separator is {@literal null}
+ * the result is simply all items concatenated and the process is not
+ * reversible through {@link #splitEncoded(String)}
+ */
+ public String joinEncoded(Iterable<?> items) {
+ if (items == null) {
+ return "";
+ }
+ Iterator<?> i = items.iterator();
+ if (!i.hasNext()) {
+ return "";
+ }
+
+ StringBuilder sb = new StringBuilder(encode(i.next().toString()));
+ while (i.hasNext()) {
+ sb.append(this.value);
+ sb.append(encode(i.next().toString()));
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * @param compoundValue containing individual values separated by this
+ * separator, which have that separator encoded.
+ * @return non-null set of values from the compoundValue with the separator
+ * decoded.
+ */
+ public Collection<String> splitEncoded(String compoundValue) {
+ List<String> result = new ArrayList<String>();
+ if (compoundValue != null) {
+ for (String value : compoundValue.split(quotedValue)) {
+ result.add(decode(value));
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Splits the source array into multiple array segments using this separator,
+ * up to a maximum of count items. This will naturally produce copied byte
+ * arrays for each of the split segments.
+ * @param source to be split
+ * @param limit on how many segments are supposed to be returned. Negative
+ * value indicates no limit on number of segments.
+ * @return source split by this separator.
+ */
+ public byte[][] split(byte[] source, int limit) {
+ return TimelineWriterUtils.split(source, this.bytes, limit);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java
new file mode 100644
index 0000000..5518a27
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * contains the constants used in the context of schema accesses for
+ * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
+ * information
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TimelineEntitySchemaConstants {
+
+ /**
+ * Used to create a pre-split for tables starting with a username in the
+ * prefix. TODO: this may have to become a config variable (string with
+ * separators) so that different installations can presplit based on their own
+ * commonly occurring names.
+ */
+ private final static byte[][] USERNAME_SPLITS = { Bytes.toBytes("a"),
+ Bytes.toBytes("ad"), Bytes.toBytes("an"), Bytes.toBytes("b"),
+ Bytes.toBytes("ca"), Bytes.toBytes("cl"), Bytes.toBytes("d"),
+ Bytes.toBytes("e"), Bytes.toBytes("f"), Bytes.toBytes("g"),
+ Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j"),
+ Bytes.toBytes("k"), Bytes.toBytes("l"), Bytes.toBytes("m"),
+ Bytes.toBytes("n"), Bytes.toBytes("o"), Bytes.toBytes("q"),
+ Bytes.toBytes("r"), Bytes.toBytes("s"), Bytes.toBytes("se"),
+ Bytes.toBytes("t"), Bytes.toBytes("u"), Bytes.toBytes("v"),
+ Bytes.toBytes("w"), Bytes.toBytes("x"), Bytes.toBytes("y"),
+ Bytes.toBytes("z") };
+
+ /**
+ * The length at which keys auto-split
+ */
+ public static final String USERNAME_SPLIT_KEY_PREFIX_LENGTH = "4";
+
+ /**
+ * @return splits for splits where a user is a prefix.
+ */
+ public final static byte[][] getUsernameSplits() {
+ byte[][] kloon = USERNAME_SPLITS.clone();
+ // Deep copy.
+ for (int row = 0; row < USERNAME_SPLITS.length; row++) {
+ kloon[row] = Bytes.copy(USERNAME_SPLITS[row]);
+ }
+ return kloon;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
new file mode 100644
index 0000000..28a0b6a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * bunch of utility functions used across TimelineWriter classes
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TimelineWriterUtils {
+
+ /** empty bytes */
+ public static final byte[] EMPTY_BYTES = new byte[0];
+
+ /**
+ * Splits the source array into multiple array segments using the given
+ * separator, up to a maximum of count items. This will naturally produce
+ * copied byte arrays for each of the split segments. To identify the split
+ * ranges without the array copies, see
+ * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
+ *
+ * @param source
+ * @param separator
+ * @return byte[] array after splitting the source
+ */
+ public static byte[][] split(byte[] source, byte[] separator) {
+ return split(source, separator, -1);
+ }
+
+ /**
+ * Splits the source array into multiple array segments using the given
+ * separator, up to a maximum of count items. This will naturally produce
+ * copied byte arrays for each of the split segments. To identify the split
+ * ranges without the array copies, see
+ * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
+ *
+ * @param source
+ * @param separator
+ * @param limit a negative value indicates no limit on number of segments.
+ * @return byte[][] after splitting the input source
+ */
+ public static byte[][] split(byte[] source, byte[] separator, int limit) {
+ List<Range> segments = splitRanges(source, separator, limit);
+
+ byte[][] splits = new byte[segments.size()][];
+ for (int i = 0; i < segments.size(); i++) {
+ Range r = segments.get(i);
+ byte[] tmp = new byte[r.length()];
+ if (tmp.length > 0) {
+ System.arraycopy(source, r.start(), tmp, 0, r.length());
+ }
+ splits[i] = tmp;
+ }
+ return splits;
+ }
+
+ /**
+ * Returns a list of ranges identifying [start, end) -- closed, open --
+ * positions within the source byte array that would be split using the
+ * separator byte array.
+ */
+ public static List<Range> splitRanges(byte[] source, byte[] separator) {
+ return splitRanges(source, separator, -1);
+ }
+
+ /**
+ * Returns a list of ranges identifying [start, end) -- closed, open --
+ * positions within the source byte array that would be split using the
+ * separator byte array.
+ *
+ * @param source the source data
+ * @param separator the separator pattern to look for
+ * @param limit the maximum number of splits to identify in the source
+ */
+ public static List<Range> splitRanges(byte[] source, byte[] separator,
+ int limit) {
+ List<Range> segments = new ArrayList<Range>();
+ if ((source == null) || (separator == null)) {
+ return segments;
+ }
+ int start = 0;
+ itersource: for (int i = 0; i < source.length; i++) {
+ for (int j = 0; j < separator.length; j++) {
+ if (source[i + j] != separator[j]) {
+ continue itersource;
+ }
+ }
+ // all separator elements matched
+ if (limit > 0 && segments.size() >= (limit - 1)) {
+ // everything else goes in one final segment
+ break;
+ }
+
+ segments.add(new Range(start, i));
+ start = i + separator.length;
+ // i will be incremented again in outer for loop
+ i += separator.length - 1;
+ }
+ // add in remaining to a final range
+ if (start <= source.length) {
+ segments.add(new Range(start, source.length));
+ }
+ return segments;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java
new file mode 100644
index 0000000..64a11f8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.hbase.client.BufferedMutator;
+
+/**
+ * Just a typed wrapper around {@link BufferedMutator} used to ensure that
+ * columns can write only to the table mutator for the right table.
+ */
+public interface TypedBufferedMutator<T> extends BufferedMutator {
+ // This class is intentionally left (almost) blank
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
new file mode 100644
index 0000000..32577fb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
new file mode 100644
index 0000000..90da966
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+
+/**
+ * Identifies fully qualified columns for the {@link EntityTable}.
+ */
+public enum EntityColumn implements Column<EntityTable> {
+
+ /**
+ * Identifier for the entity.
+ */
+ ID(EntityColumnFamily.INFO, "id"),
+
+ /**
+ * The type of entity
+ */
+ TYPE(EntityColumnFamily.INFO, "type"),
+
+ /**
+ * When the entity was created.
+ */
+ CREATED_TIME(EntityColumnFamily.INFO, "created_time"),
+
+ /**
+ * When it was modified.
+ */
+ MODIFIED_TIME(EntityColumnFamily.INFO, "modified_time"),
+
+ /**
+ * The version of the flow that this entity belongs to.
+ */
+ FLOW_VERSION(EntityColumnFamily.INFO, "flow_version");
+
+ private final ColumnHelper<EntityTable> column;
+ private final ColumnFamily<EntityTable> columnFamily;
+ private final String columnQualifier;
+ private final byte[] columnQualifierBytes;
+
+ private EntityColumn(ColumnFamily<EntityTable> columnFamily,
+ String columnQualifier) {
+ this.columnFamily = columnFamily;
+ this.columnQualifier = columnQualifier;
+ // Future-proof by ensuring the right column prefix hygiene.
+ this.columnQualifierBytes =
+ Bytes.toBytes(Separator.SPACE.encode(columnQualifier));
+ this.column = new ColumnHelper<EntityTable>(columnFamily);
+ }
+
+ /**
+ * @return the column name value
+ */
+ private String getColumnQualifier() {
+ return columnQualifier;
+ }
+
+ public void store(byte[] rowKey,
+ TypedBufferedMutator<EntityTable> tableMutator, Long timestamp,
+ Object inputValue) throws IOException {
+ column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
+ inputValue);
+ }
+
+ public Object readResult(Result result) throws IOException {
+ return column.readResult(result, columnQualifierBytes);
+ }
+
+ /**
+ * Retrieve an {@link EntityColumn} given a name, or null if there is no
+ * match. The following holds true: {@code columnFor(x) == columnFor(y)} if
+ * and only if {@code x.equals(y)} or {@code (x == y == null)}
+ *
+ * @param columnQualifier Name of the column to retrieve
+ * @return the corresponding {@link EntityColumn} or null
+ */
+ public static final EntityColumn columnFor(String columnQualifier) {
+
+ // Match column based on value, assume column family matches.
+ for (EntityColumn ec : EntityColumn.values()) {
+ // Find a match based only on name.
+ if (ec.getColumnQualifier().equals(columnQualifier)) {
+ return ec;
+ }
+ }
+
+ // Default to null
+ return null;
+ }
+
+ /**
+ * Retrieve an {@link EntityColumn} given a name, or null if there is no
+ * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
+ * if and only if {@code a.equals(b) & x.equals(y)} or
+ * {@code (x == y == null)}
+ *
+ * @param columnFamily The columnFamily for which to retrieve the column.
+ * @param name Name of the column to retrieve
+ * @return the corresponding {@link EntityColumn} or null if both arguments
+ * don't match.
+ */
+ public static final EntityColumn columnFor(EntityColumnFamily columnFamily,
+ String name) {
+
+ for (EntityColumn ec : EntityColumn.values()) {
+ // Find a match based column family and on name.
+ if (ec.columnFamily.equals(columnFamily)
+ && ec.getColumnQualifier().equals(name)) {
+ return ec;
+ }
+ }
+
+ // Default to null
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java
new file mode 100644
index 0000000..8a95d12
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents the entity table column families.
+ */
+public enum EntityColumnFamily implements ColumnFamily<EntityTable> {
+
+ /**
+ * Info column family houses known columns, specifically ones included in
+ * columnfamily filters.
+ */
+ INFO("i"),
+
+ /**
+ * Configurations are in a separate column family for two reasons: a) the size
+ * of the config values can be very large and b) we expect that config values
+ * are often separately accessed from other metrics and info columns.
+ */
+ CONFIGS("c"),
+
+ /**
+ * Metrics have a separate column family, because they have a separate TTL.
+ */
+ METRICS("m");
+
+ /**
+ * Byte representation of this column family.
+ */
+ private final byte[] bytes;
+
+ /**
+ * @param value create a column family with this name. Must be lower case and
+ * without spaces.
+ */
+ private EntityColumnFamily(String value) {
+ // column families should be lower case and not contain any spaces.
+ this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
+ }
+
+ public byte[] getBytes() {
+ return Bytes.copy(bytes);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
new file mode 100644
index 0000000..4459868
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+
+/**
+ * Identifies partially qualified columns for the entity table.
+ */
+public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
+
+ /**
+ * To store TimelineEntity getIsRelatedToEntities values.
+ */
+ IS_RELATED_TO(EntityColumnFamily.INFO, "s"),
+
+ /**
+ * To store TimelineEntity getRelatesToEntities values.
+ */
+ RELATES_TO(EntityColumnFamily.INFO, "r"),
+
+ /**
+ * Lifecycle events for an entity
+ */
+ EVENT(EntityColumnFamily.INFO, "e"),
+
+ /**
+ * Config column stores configuration with config key as the column name.
+ */
+ CONFIG(EntityColumnFamily.CONFIGS, null),
+
+ /**
+ * Metrics are stored with the metric name as the column name.
+ */
+ METRIC(EntityColumnFamily.METRICS, null);
+
+ private final ColumnHelper<EntityTable> column;
+ private final ColumnFamily<EntityTable> columnFamily;
+
+ /**
+ * Can be null for those cases where the provided column qualifier is the
+ * entire column name.
+ */
+ private final String columnPrefix;
+ private final byte[] columnPrefixBytes;
+
+ /**
+ * Private constructor, meant to be used by the enum definition.
+ *
+ * @param columnFamily that this column is stored in.
+ * @param columnPrefix for this column.
+ */
+ private EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
+ String columnPrefix) {
+ column = new ColumnHelper<EntityTable>(columnFamily);
+ this.columnFamily = columnFamily;
+ this.columnPrefix = columnPrefix;
+ if (columnPrefix == null) {
+ this.columnPrefixBytes = null;
+ } else {
+ // Future-proof by ensuring the right column prefix hygiene.
+ this.columnPrefixBytes =
+ Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
+ }
+ }
+
+ /**
+ * @return the column name value
+ */
+ private String getColumnPrefix() {
+ return columnPrefix;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #store(byte[],
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+ * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
+ */
+ public void store(byte[] rowKey,
+ TypedBufferedMutator<EntityTable> tableMutator, String qualifier,
+ Long timestamp, Object inputValue) throws IOException {
+
+ // Null check
+ if (qualifier == null) {
+ throw new IOException("Cannot store column with null qualifier in "
+ + tableMutator.getName().getNameAsString());
+ }
+
+ byte[] columnQualifier =
+ ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+
+ column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
+ */
+ public Object readResult(Result result, String qualifier) throws IOException {
+ byte[] columnQualifier =
+ ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+ return column.readResult(result, columnQualifier);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #readResults(org.apache.hadoop.hbase.client.Result)
+ */
+ public Map<String, Object> readResults(Result result) throws IOException {
+ return column.readResults(result, columnPrefixBytes);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #readTimeseriesResults(org.apache.hadoop.hbase.client.Result)
+ */
+ public NavigableMap<String, NavigableMap<Long, Number>> readTimeseriesResults(
+ Result result) throws IOException {
+ return column.readTimeseriesResults(result, columnPrefixBytes);
+ }
+
+ /**
+ * Retrieve an {@link EntityColumnPrefix} given a name, or null if there is no
+ * match. The following holds true: {@code columnFor(x) == columnFor(y)} if
+ * and only if {@code x.equals(y)} or {@code (x == y == null)}
+ *
+ * @param columnPrefix Name of the column to retrieve
+ * @return the corresponding {@link EntityColumnPrefix} or null
+ */
+ public static final EntityColumnPrefix columnFor(String columnPrefix) {
+
+ // Match column based on value, assume column family matches.
+ for (EntityColumnPrefix ecp : EntityColumnPrefix.values()) {
+ // Find a match based only on name.
+ if (ecp.getColumnPrefix().equals(columnPrefix)) {
+ return ecp;
+ }
+ }
+
+ // Default to null
+ return null;
+ }
+
+ /**
+ * Retrieve an {@link EntityColumnPrefix} given a name, or null if there is no
+ * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
+ * if and only if {@code (x == y == null)} or
+ * {@code a.equals(b) & x.equals(y)}
+ *
+ * @param columnFamily The columnFamily for which to retrieve the column.
+ * @param columnPrefix Name of the column to retrieve
+ * @return the corresponding {@link EntityColumnPrefix} or null if both
+ * arguments don't match.
+ */
+ public static final EntityColumnPrefix columnFor(
+ EntityColumnFamily columnFamily, String columnPrefix) {
+
+ // TODO: needs unit test to confirm and need to update javadoc to explain
+ // null prefix case.
+
+ for (EntityColumnPrefix ecp : EntityColumnPrefix.values()) {
+ // Find a match based column family and on name.
+ if (ecp.columnFamily.equals(columnFamily)
+ && (((columnPrefix == null) && (ecp.getColumnPrefix() == null)) || (ecp
+ .getColumnPrefix().equals(columnPrefix)))) {
+ return ecp;
+ }
+ }
+
+ // Default to null
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
new file mode 100644
index 0000000..61958c2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents a rowkey for the entity table.
+ */
+public class EntityRowKey {
+ // TODO: more methods are needed for this class.
+
+ // TODO: API needs to be cleaned up.
+
+ /**
+ * Constructs a row key prefix for the entity table as follows:
+ * {@code userName!clusterId!flowId!flowRunId!AppId}
+ *
+ * @param clusterId
+ * @param userId
+ * @param flowId
+ * @param flowRunId
+ * @param appId
+ * @return byte array with the row key prefix
+ */
+ public static byte[] getRowKeyPrefix(String clusterId, String userId,
+ String flowId, Long flowRunId, String appId) {
+ byte[] first =
+ Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
+ flowId));
+ // Note that flowRunId is a long, so we can't encode them all at the same
+ // time.
+ byte[] second = Bytes.toBytes(EntityRowKey.invert(flowRunId));
+ byte[] third = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId));
+ return Separator.QUALIFIERS.join(first, second, third);
+ }
+
+ /**
+ * Constructs a row key prefix for the entity table as follows:
+ * {@code userName!clusterId!flowId!flowRunId!AppId}
+ *
+ * @param clusterId
+ * @param userId
+ * @param flowId
+ * @param flowRunId
+ * @param appId
+ * @return byte array with the row key prefix
+ */
+ public static byte[] getRowKey(String clusterId, String userId,
+ String flowId, Long flowRunId, String appId, TimelineEntity te) {
+ byte[] first =
+ Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
+ flowId));
+ // Note that flowRunId is a long, so we can't encode them all at the same
+ // time.
+ byte[] second = Bytes.toBytes(EntityRowKey.invert(flowRunId));
+ byte[] third =
+ Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, te.getType(),
+ te.getId()));
+ return Separator.QUALIFIERS.join(first, second, third);
+ }
+
+ /**
+ * Converts a timestamp into it's inverse timestamp to be used in (row) keys
+ * where we want to have the most recent timestamp in the top of the table
+ * (scans start at the most recent timestamp first).
+ *
+ * @param key value to be inverted so that the latest version will be first in
+ * a scan.
+ * @return inverted long
+ */
+ public static long invert(Long key) {
+ return Long.MAX_VALUE - key;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
new file mode 100644
index 0000000..61f7c4c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEntitySchemaConstants;
+
+/**
+ * The entity table as column families info, config and metrics. Info stores
+ * information about a timeline entity object config stores configuration data
+ * of a timeline entity object metrics stores the metrics of a timeline entity
+ * object
+ *
+ * Example entity table record:
+ *
+ * <pre>
+ * |--------------------------------------------------------------------|
+ * | Row | Column Family | Column Family| Column Family|
+ * | key | info | metrics | config |
+ * |--------------------------------------------------------------------|
+ * | userName! | id:entityId | metricId1: | configKey1: |
+ * | clusterId! | | metricValue1 | configValue1 |
+ * | flowId! | type:entityType | @timestamp1 | |
+ * | flowRunId! | | | configKey2: |
+ * | AppId! | created_time: | metriciD1: | configValue2 |
+ * | entityType!| 1392993084018 | metricValue2 | |
+ * | entityId | | @timestamp2 | |
+ * | | modified_time: | | |
+ * | | 1392995081012 | metricId2: | |
+ * | | | metricValue1 | |
+ * | | r!relatesToKey: | @timestamp2 | |
+ * | | id3?id4?id5 | | |
+ * | | | | |
+ * | | s!isRelatedToKey | | |
+ * | | id7?id9?id6 | | |
+ * | | | | |
+ * | | e!eventId?eventInfoKey: | | |
+ * | | eventInfoValue | | |
+ * | | | | |
+ * | | flowVersion: | | |
+ * | | versionValue | | |
+ * |--------------------------------------------------------------------|
+ * </pre>
+ */
+public class EntityTable extends BaseTable<EntityTable> {
+ /** entity prefix */
+ private static final String PREFIX =
+ YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".entity";
+
+ /** config param name that specifies the entity table name */
+ public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+ /**
+ * config param name that specifies the TTL for metrics column family in
+ * entity table
+ */
+ private static final String METRICS_TTL_CONF_NAME = PREFIX
+ + ".table.metrics.ttl";
+
+ /** default value for entity table name */
+ private static final String DEFAULT_TABLE_NAME = "timelineservice.entity";
+
+ /** default TTL is 30 days for metrics timeseries */
+ private static final int DEFAULT_METRICS_TTL = 2592000;
+
+ /** default max number of versions */
+ private static final int DEFAULT_METRICS_MAX_VERSIONS = 1000;
+
+ private static final Log LOG = LogFactory.getLog(EntityTable.class);
+
+ public EntityTable() {
+ super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
+ * (org.apache.hadoop.hbase.client.Admin,
+ * org.apache.hadoop.conf.Configuration)
+ */
+ public void createTable(Admin admin, Configuration hbaseConf)
+ throws IOException {
+
+ TableName table = getTableName(hbaseConf);
+ if (admin.tableExists(table)) {
+ // do not disable / delete existing table
+ // similar to the approach taken by map-reduce jobs when
+ // output directory exists
+ throw new IOException("Table " + table.getNameAsString()
+ + " already exists.");
+ }
+
+ HTableDescriptor entityTableDescp = new HTableDescriptor(table);
+ HColumnDescriptor infoCF =
+ new HColumnDescriptor(EntityColumnFamily.INFO.getBytes());
+ infoCF.setBloomFilterType(BloomType.ROWCOL);
+ entityTableDescp.addFamily(infoCF);
+
+ HColumnDescriptor configCF =
+ new HColumnDescriptor(EntityColumnFamily.CONFIGS.getBytes());
+ configCF.setBloomFilterType(BloomType.ROWCOL);
+ configCF.setBlockCacheEnabled(true);
+ entityTableDescp.addFamily(configCF);
+
+ HColumnDescriptor metricsCF =
+ new HColumnDescriptor(EntityColumnFamily.METRICS.getBytes());
+ entityTableDescp.addFamily(metricsCF);
+ metricsCF.setBlockCacheEnabled(true);
+ // always keep 1 version (the latest)
+ metricsCF.setMinVersions(1);
+ metricsCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
+ metricsCF.setTimeToLive(hbaseConf.getInt(METRICS_TTL_CONF_NAME,
+ DEFAULT_METRICS_TTL));
+ entityTableDescp
+ .setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
+ entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
+ TimelineEntitySchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
+ admin.createTable(entityTableDescp,
+ TimelineEntitySchemaConstants.getUsernameSplits());
+ LOG.info("Status of table creation for " + table.getNameAsString() + "="
+ + admin.tableExists(table));
+ }
+
+ /**
+ * @param metricsTTL time to live parameter for the metricss in this table.
+ * @param hbaseConf configururation in which to set the metrics TTL config
+ * variable.
+ */
+ public void setMetricsTTL(int metricsTTL, Configuration hbaseConf) {
+ hbaseConf.setInt(METRICS_TTL_CONF_NAME, metricsTTL);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java
new file mode 100644
index 0000000..26f1cc5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
index f999b4d..6abf240 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
@@ -18,43 +18,41 @@
package org.apache.hadoop.yarn.server.timelineservice.storage;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertNotNull;
-
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
-import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
-import org.junit.BeforeClass;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
/**
- * Unit test HBaseTimelineWriterImpl
- * YARN 3411
- *
* @throws Exception
*/
public class TestHBaseTimelineWriterImpl {
@@ -69,12 +67,8 @@ public class TestHBaseTimelineWriterImpl {
}
private static void createSchema() throws IOException {
- byte[][] families = new byte[3][];
- families[0] = EntityColumnFamily.INFO.getInBytes();
- families[1] = EntityColumnFamily.CONFIG.getInBytes();
- families[2] = EntityColumnFamily.METRICS.getInBytes();
- TimelineSchemaCreator.createTimelineEntityTable(util.getHBaseAdmin(),
- util.getConfiguration());
+ new EntityTable()
+ .createTable(util.getHBaseAdmin(), util.getConfiguration());
}
@Test
@@ -151,18 +145,15 @@ public class TestHBaseTimelineWriterImpl {
// scan the table and see that entity exists
Scan s = new Scan();
- byte[] startRow = TimelineWriterUtils.getRowKeyPrefix(cluster, user, flow,
- runid, appName);
+ byte[] startRow =
+ EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
s.setStartRow(startRow);
s.setMaxVersions(Integer.MAX_VALUE);
- ResultScanner scanner = null;
- TableName entityTableName = TableName
- .valueOf(TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME);
Connection conn = ConnectionFactory.createConnection(c1);
- Table entityTable = conn.getTable(entityTableName);
+ ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s);
+
int rowCount = 0;
int colCount = 0;
- scanner = entityTable.getScanner(s);
for (Result result : scanner) {
if (result != null && !result.isEmpty()) {
rowCount++;
@@ -172,37 +163,77 @@ public class TestHBaseTimelineWriterImpl {
entity));
// check info column family
- NavigableMap<byte[], byte[]> infoValues = result
- .getFamilyMap(EntityColumnFamily.INFO.getInBytes());
- String id1 = TimelineWriterUtils.getValueAsString(
- EntityColumnDetails.ID.getInBytes(), infoValues);
+ String id1 = EntityColumn.ID.readResult(result).toString();
assertEquals(id, id1);
- String type1 = TimelineWriterUtils.getValueAsString(
- EntityColumnDetails.TYPE.getInBytes(), infoValues);
+
+ String type1 = EntityColumn.TYPE.readResult(result).toString();
assertEquals(type, type1);
- Long cTime1 = TimelineWriterUtils.getValueAsLong(
- EntityColumnDetails.CREATED_TIME.getInBytes(), infoValues);
+
+ Number val = (Number) EntityColumn.CREATED_TIME.readResult(result);
+ Long cTime1 = val.longValue();
assertEquals(cTime1, cTime);
- Long mTime1 = TimelineWriterUtils.getValueAsLong(
- EntityColumnDetails.MODIFIED_TIME.getInBytes(), infoValues);
+
+ val = (Number) EntityColumn.MODIFIED_TIME.readResult(result);
+ Long mTime1 = val.longValue();
assertEquals(mTime1, mTime);
- checkRelatedEntities(isRelatedTo, infoValues,
- EntityColumnDetails.PREFIX_IS_RELATED_TO.getInBytes());
- checkRelatedEntities(relatesTo, infoValues,
- EntityColumnDetails.PREFIX_RELATES_TO.getInBytes());
-
- // check config column family
- NavigableMap<byte[], byte[]> configValuesResult = result
- .getFamilyMap(EntityColumnFamily.CONFIG.getInBytes());
- checkConfigs(configValuesResult, conf);
-
- NavigableMap<byte[], byte[]> metricsResult = result
- .getFamilyMap(EntityColumnFamily.METRICS.getInBytes());
- checkMetricsSizeAndKey(metricsResult, metrics);
- List<Cell> metricCells = result.getColumnCells(
- EntityColumnFamily.METRICS.getInBytes(),
- Bytes.toBytes(m1.getId()));
- checkMetricsTimeseries(metricCells, m1);
+
+ // Remember isRelatedTo is of type Map<String, Set<String>>
+ for (String isRelatedToKey : isRelatedTo.keySet()) {
+ Object isRelatedToValue =
+ EntityColumnPrefix.IS_RELATED_TO.readResult(result,
+ isRelatedToKey);
+ String compoundValue = isRelatedToValue.toString();
+ // id7?id9?id6
+ Set<String> isRelatedToValues =
+ new HashSet<String>(
+ Separator.VALUES.splitEncoded(compoundValue));
+ assertEquals(isRelatedTo.get(isRelatedToKey).size(),
+ isRelatedToValues.size());
+ for (String v : isRelatedTo.get(isRelatedToKey)) {
+ assertTrue(isRelatedToValues.contains(v));
+ }
+ }
+
+ // RelatesTo
+ for (String relatesToKey : relatesTo.keySet()) {
+ String compoundValue =
+ EntityColumnPrefix.RELATES_TO.readResult(result, relatesToKey)
+ .toString();
+ // id3?id4?id5
+ Set<String> relatesToValues =
+ new HashSet<String>(
+ Separator.VALUES.splitEncoded(compoundValue));
+ assertEquals(relatesTo.get(relatesToKey).size(),
+ relatesToValues.size());
+ for (String v : relatesTo.get(relatesToKey)) {
+ assertTrue(relatesToValues.contains(v));
+ }
+ }
+
+ // Configuration
+ Map<String, Object> configColumns =
+ EntityColumnPrefix.CONFIG.readResults(result);
+ assertEquals(conf.size(), configColumns.size());
+ for (String configItem : conf.keySet()) {
+ assertEquals(conf.get(configItem), configColumns.get(configItem));
+ }
+
+ NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
+ EntityColumnPrefix.METRIC.readTimeseriesResults(result);
+
+ NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
+ // We got metrics back
+ assertNotNull(metricMap);
+ // Same number of metrics as we wrote
+ assertEquals(metricValues.entrySet().size(), metricMap.entrySet()
+ .size());
+
+ // Iterate over original metrics and confirm that they are present
+ // here.
+ for (Entry<Long, Number> metricEntry : metricValues.entrySet()) {
+ assertEquals(metricEntry.getValue(),
+ metricMap.get(metricEntry.getKey()));
+ }
}
}
assertEquals(1, rowCount);
@@ -212,80 +243,77 @@ public class TestHBaseTimelineWriterImpl {
hbi.stop();
hbi.close();
}
- }
-
- private void checkMetricsTimeseries(List<Cell> metricCells,
- TimelineMetric m1) throws IOException {
- Map<Long, Number> timeseries = m1.getValues();
- assertEquals(timeseries.size(), metricCells.size());
- for (Cell c1 : metricCells) {
- assertTrue(timeseries.containsKey(c1.getTimestamp()));
- assertEquals(GenericObjectMapper.read(CellUtil.cloneValue(c1)),
- timeseries.get(c1.getTimestamp()));
- }
- }
-
- private void checkMetricsSizeAndKey(
- NavigableMap<byte[], byte[]> metricsResult, Set<TimelineMetric> metrics) {
- assertEquals(metrics.size(), metricsResult.size());
- for (TimelineMetric m1 : metrics) {
- byte[] key = Bytes.toBytes(m1.getId());
- assertTrue(metricsResult.containsKey(key));
- }
- }
-
- private void checkConfigs(NavigableMap<byte[], byte[]> configValuesResult,
- Map<String, String> conf) throws IOException {
-
- assertEquals(conf.size(), configValuesResult.size());
- byte[] columnName;
- for (String key : conf.keySet()) {
- columnName = Bytes.toBytes(key);
- assertTrue(configValuesResult.containsKey(columnName));
- byte[] value = configValuesResult.get(columnName);
- assertNotNull(value);
- assertEquals(conf.get(key), GenericObjectMapper.read(value));
- }
- }
- private void checkRelatedEntities(Map<String, Set<String>> isRelatedTo,
- NavigableMap<byte[], byte[]> infoValues, byte[] columnPrefix)
- throws IOException {
-
- for (String key : isRelatedTo.keySet()) {
- byte[] columnName = TimelineWriterUtils.join(
- TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, columnPrefix,
- Bytes.toBytes(key));
-
- byte[] value = infoValues.get(columnName);
- assertNotNull(value);
- String isRelatedToEntities = GenericObjectMapper.read(value).toString();
- assertNotNull(isRelatedToEntities);
- assertEquals(
- TimelineWriterUtils.getValueAsString(
- TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR,
- isRelatedTo.get(key)), isRelatedToEntities);
- }
+ // Somewhat of a hack, not a separate test in order not to have to deal with
+ // test case order exectution.
+ testAdditionalEntity();
}
private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
String flow, Long runid, String appName, TimelineEntity te) {
- byte[][] rowKeyComponents = TimelineWriterUtils.split(rowKey,
- TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES);
+ byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1);
assertTrue(rowKeyComponents.length == 7);
assertEquals(user, Bytes.toString(rowKeyComponents[0]));
assertEquals(cluster, Bytes.toString(rowKeyComponents[1]));
assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
- assertEquals(TimelineWriterUtils.encodeRunId(runid),
- Bytes.toLong(rowKeyComponents[3]));
- assertEquals(TimelineWriterUtils.cleanse(appName), Bytes.toString(rowKeyComponents[4]));
+ assertEquals(EntityRowKey.invert(runid), Bytes.toLong(rowKeyComponents[3]));
+ assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5]));
assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6]));
return true;
}
+ private void testAdditionalEntity() throws IOException {
+ TimelineEvent event = new TimelineEvent();
+ event.setId("foo_event_id");
+ event.setTimestamp(System.currentTimeMillis());
+ event.addInfo("foo_event", "test");
+
+ final TimelineEntity entity = new TimelineEntity();
+ entity.setId("attempt_1329348432655_0001_m_000008_18");
+ entity.setType("FOO_ATTEMPT");
+
+ TimelineEntities entities = new TimelineEntities();
+ entities.addEntity(entity);
+
+ HBaseTimelineWriterImpl hbi = null;
+ try {
+ Configuration c1 = util.getConfiguration();
+ hbi = new HBaseTimelineWriterImpl(c1);
+ hbi.init(c1);
+ String cluster = "cluster2";
+ String user = "user2";
+ String flow = "other_flow_name";
+ String flowVersion = "1111F01C2287BA";
+ long runid = 1009876543218L;
+ String appName = "some app name";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
+ hbi.stop();
+ // scan the table and see that entity exists
+ Scan s = new Scan();
+ byte[] startRow =
+ EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
+ s.setStartRow(startRow);
+ s.setMaxVersions(Integer.MAX_VALUE);
+ Connection conn = ConnectionFactory.createConnection(c1);
+ ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s);
+
+ int rowCount = 0;
+ for (Result result : scanner) {
+ if (result != null && !result.isEmpty()) {
+ rowCount++;
+ }
+ }
+ assertEquals(1, rowCount);
+
+ } finally {
+ hbi.stop();
+ hbi.close();
+ }
+ }
+
@AfterClass
public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java
new file mode 100644
index 0000000..8b25a83
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+public class TestSeparator {
+
+ private static String villain = "Dr. Heinz Doofenshmirtz";
+ private static String special =
+ ". * | ? + ( ) [ ] { } ^ $ \\ \"";
+
+ /**
+ *
+ */
+ @Test
+ public void testEncodeDecodeString() {
+
+ for (Separator separator : Separator.values()) {
+ testEncodeDecode(separator, "");
+ testEncodeDecode(separator, " ");
+ testEncodeDecode(separator, "!");
+ testEncodeDecode(separator, "?");
+ testEncodeDecode(separator, "&");
+ testEncodeDecode(separator, "+");
+ testEncodeDecode(separator, "Dr.");
+ testEncodeDecode(separator, "Heinz");
+ testEncodeDecode(separator, "Doofenshmirtz");
+ testEncodeDecode(separator, villain);
+ testEncodeDecode(separator, special);
+
+ assertNull(separator.encode(null));
+
+ }
+ }
+
+ private void testEncodeDecode(Separator separator, String token) {
+ String encoded = separator.encode(token);
+ String decoded = separator.decode(encoded);
+ String msg = "token:" + token + " separator:" + separator + ".";
+ assertEquals(msg, token, decoded);
+ }
+
+ @Test
+ public void testEncodeDecode() {
+ testEncodeDecode("Dr.", Separator.QUALIFIERS);
+ testEncodeDecode("Heinz", Separator.QUALIFIERS, Separator.QUALIFIERS);
+ testEncodeDecode("Doofenshmirtz", Separator.QUALIFIERS, null,
+ Separator.QUALIFIERS);
+ testEncodeDecode("&Perry", Separator.QUALIFIERS, Separator.VALUES, null);
+ testEncodeDecode("the ", Separator.QUALIFIERS, Separator.SPACE);
+ testEncodeDecode("Platypus...", (Separator) null);
+ testEncodeDecode("The what now ?!?", Separator.QUALIFIERS,
+ Separator.VALUES, Separator.SPACE);
+
+ }
+
+ /**
+ * Simple test to encode and decode using the same separators and confirm that
+ * we end up with the same as what we started with.
+ *
+ * @param token
+ * @param separators
+ */
+ private static void testEncodeDecode(String token, Separator... separators) {
+ byte[] encoded = Separator.encode(token, separators);
+ String decoded = Separator.decode(encoded, separators);
+ assertEquals(token, decoded);
+ }
+
+ @Test
+ public void testJoinStripped() {
+ List<String> stringList = new ArrayList<String>(0);
+ stringList.add("nothing");
+
+ String joined = Separator.VALUES.joinEncoded(stringList);
+ Iterable<String> split = Separator.VALUES.splitEncoded(joined);
+ assertTrue(Iterables.elementsEqual(stringList, split));
+
+ stringList = new ArrayList<String>(3);
+ stringList.add("a");
+ stringList.add("b?");
+ stringList.add("c");
+
+ joined = Separator.VALUES.joinEncoded(stringList);
+ split = Separator.VALUES.splitEncoded(joined);
+ assertTrue(Iterables.elementsEqual(stringList, split));
+
+ String[] stringArray1 = { "else" };
+ joined = Separator.VALUES.joinEncoded(stringArray1);
+ split = Separator.VALUES.splitEncoded(joined);
+ assertTrue(Iterables.elementsEqual(Arrays.asList(stringArray1), split));
+
+ String[] stringArray2 = { "d", "e?", "f" };
+ joined = Separator.VALUES.joinEncoded(stringArray2);
+ split = Separator.VALUES.splitEncoded(joined);
+ assertTrue(Iterables.elementsEqual(Arrays.asList(stringArray2), split));
+
+ List<String> empty = new ArrayList<String>(0);
+ split = Separator.VALUES.splitEncoded(null);
+ assertTrue(Iterables.elementsEqual(empty, split));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineWriterUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineWriterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineWriterUtils.java
new file mode 100644
index 0000000..4f96f87
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineWriterUtils.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.junit.Test;
+
+public class TestTimelineWriterUtils {
+
+ @Test
+ public void test() {
+ // TODO: implement a test for the remaining method in TimelineWriterUtils.
+ }
+
+}
[2/2] hadoop git commit: YARN-3706. Generalize native HBase writer
for additional tables (Joep Rottinghuis via sjlee)
Posted by sj...@apache.org.
YARN-3706. Generalize native HBase writer for additional tables (Joep Rottinghuis via sjlee)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9137aeae
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9137aeae
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9137aeae
Branch: refs/heads/YARN-2928
Commit: 9137aeae0dec83f9eff40d12cae712dfd508c0c5
Parents: a1bb913
Author: Sangjin Lee <sj...@apache.org>
Authored: Thu Jun 18 10:49:20 2015 -0700
Committer: Sangjin Lee <sj...@apache.org>
Committed: Thu Jun 18 10:49:20 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../storage/EntityColumnDetails.java | 110 ------
.../storage/EntityColumnFamily.java | 95 -----
.../storage/HBaseTimelineWriterImpl.java | 114 +++---
.../server/timelineservice/storage/Range.java | 59 ----
.../storage/TimelineEntitySchemaConstants.java | 71 ----
.../storage/TimelineSchemaCreator.java | 134 +-------
.../timelineservice/storage/TimelineWriter.java | 3 +-
.../storage/TimelineWriterUtils.java | 344 -------------------
.../storage/common/BaseTable.java | 118 +++++++
.../common/BufferedMutatorDelegator.java | 73 ++++
.../timelineservice/storage/common/Column.java | 59 ++++
.../storage/common/ColumnFamily.java | 34 ++
.../storage/common/ColumnHelper.java | 247 +++++++++++++
.../storage/common/ColumnPrefix.java | 83 +++++
.../timelineservice/storage/common/Range.java | 59 ++++
.../storage/common/Separator.java | 303 ++++++++++++++++
.../common/TimelineEntitySchemaConstants.java | 68 ++++
.../storage/common/TimelineWriterUtils.java | 127 +++++++
.../storage/common/TypedBufferedMutator.java | 28 ++
.../storage/common/package-info.java | 24 ++
.../storage/entity/EntityColumn.java | 141 ++++++++
.../storage/entity/EntityColumnFamily.java | 65 ++++
.../storage/entity/EntityColumnPrefix.java | 212 ++++++++++++
.../storage/entity/EntityRowKey.java | 93 +++++
.../storage/entity/EntityTable.java | 161 +++++++++
.../storage/entity/package-info.java | 25 ++
.../storage/TestHBaseTimelineWriterImpl.java | 252 ++++++++------
.../storage/common/TestSeparator.java | 129 +++++++
.../storage/common/TestTimelineWriterUtils.java | 29 ++
30 files changed, 2301 insertions(+), 962 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 040afea..197a154 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -93,6 +93,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-3276. Code cleanup for timeline service API records. (Junping Du via
zjshen)
+ YARN-3706. Generalize native HBase writer for additional tables (Joep
+ Rottinghuis via sjlee)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java
deleted file mode 100644
index 2894c41..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * Contains the Info Column Family details like Column names, types and byte
- * representations for
- * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
- * object that is stored in hbase Also has utility functions for storing each of
- * these to the backend
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-enum EntityColumnDetails {
- ID(EntityColumnFamily.INFO, "id"),
- TYPE(EntityColumnFamily.INFO, "type"),
- CREATED_TIME(EntityColumnFamily.INFO, "created_time"),
- MODIFIED_TIME(EntityColumnFamily.INFO, "modified_time"),
- FLOW_VERSION(EntityColumnFamily.INFO, "flow_version"),
- PREFIX_IS_RELATED_TO(EntityColumnFamily.INFO, "r"),
- PREFIX_RELATES_TO(EntityColumnFamily.INFO, "s"),
- PREFIX_EVENTS(EntityColumnFamily.INFO, "e");
-
- private final EntityColumnFamily columnFamily;
- private final String value;
- private final byte[] inBytes;
-
- private EntityColumnDetails(EntityColumnFamily columnFamily,
- String value) {
- this.columnFamily = columnFamily;
- this.value = value;
- this.inBytes = Bytes.toBytes(this.value.toLowerCase());
- }
-
- public String getValue() {
- return value;
- }
-
- byte[] getInBytes() {
- return inBytes;
- }
-
- void store(byte[] rowKey, BufferedMutator entityTable, Object inputValue)
- throws IOException {
- TimelineWriterUtils.store(rowKey, entityTable,
- this.columnFamily.getInBytes(), null, this.getInBytes(), inputValue,
- null);
- }
-
- /**
- * stores events data with column prefix
- */
- void store(byte[] rowKey, BufferedMutator entityTable, byte[] idBytes,
- String key, Object inputValue) throws IOException {
- TimelineWriterUtils.store(rowKey, entityTable,
- this.columnFamily.getInBytes(),
- // column prefix
- TimelineWriterUtils.join(
- TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES,
- this.getInBytes(), idBytes),
- // column qualifier
- Bytes.toBytes(key),
- inputValue, null);
- }
-
- /**
- * stores relation entities with a column prefix
- */
- void store(byte[] rowKey, BufferedMutator entityTable, String key,
- Set<String> inputValue) throws IOException {
- TimelineWriterUtils.store(rowKey, entityTable,
- this.columnFamily.getInBytes(),
- // column prefix
- this.getInBytes(),
- // column qualifier
- Bytes.toBytes(key),
- // value
- TimelineWriterUtils.getValueAsString(
- TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR, inputValue),
- // cell timestamp
- null);
- }
-
- // TODO add a method that accepts a byte array,
- // iterates over the enum and returns an enum from those bytes
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java
deleted file mode 100644
index e556351..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * Contains the Column family names and byte representations for
- * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
- * object that is stored in hbase
- * Also has utility functions for storing each of these to the backend
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-enum EntityColumnFamily {
- INFO("i"),
- CONFIG("c"),
- METRICS("m");
-
- private final String value;
- private final byte[] inBytes;
-
- private EntityColumnFamily(String value) {
- this.value = value;
- this.inBytes = Bytes.toBytes(this.value.toLowerCase());
- }
-
- byte[] getInBytes() {
- return inBytes;
- }
-
- public String getValue() {
- return value;
- }
-
- /**
- * stores the key as column and value as hbase column value in the given
- * column family in the entity table
- *
- * @param rowKey
- * @param entityTable
- * @param inputValue
- * @throws IOException
- */
- public void store(byte[] rowKey, BufferedMutator entityTable, String key,
- String inputValue) throws IOException {
- if (key == null) {
- return;
- }
- TimelineWriterUtils.store(rowKey, entityTable, this.getInBytes(), null,
- Bytes.toBytes(key), inputValue, null);
- }
-
- /**
- * stores the values along with cell timestamp
- *
- * @param rowKey
- * @param entityTable
- * @param key
- * @param timestamp
- * @param inputValue
- * @throws IOException
- */
- public void store(byte[] rowKey, BufferedMutator entityTable, String key,
- Long timestamp, Number inputValue) throws IOException {
- if (key == null) {
- return;
- }
- TimelineWriterUtils.store(rowKey, entityTable, this.getInBytes(), null,
- Bytes.toBytes(key), inputValue, timestamp);
- }
-
- // TODO add a method that accepts a byte array,
- // iterates over the enum and returns an enum from those bytes
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index aa71c6c..e48ca60 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -26,19 +26,22 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineEntitySchemaConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
/**
* This implements a hbase based backend for storing application timeline entity
@@ -50,7 +53,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
TimelineWriter {
private Connection conn;
- private BufferedMutator entityTable;
+ private TypedBufferedMutator<EntityTable> entityTable;
private static final Log LOG = LogFactory
.getLog(HBaseTimelineWriterImpl.class);
@@ -72,10 +75,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
super.serviceInit(conf);
Configuration hbaseConf = HBaseConfiguration.create(conf);
conn = ConnectionFactory.createConnection(hbaseConf);
- TableName entityTableName = TableName.valueOf(hbaseConf.get(
- TimelineEntitySchemaConstants.ENTITY_TABLE_NAME,
- TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME));
- entityTable = conn.getBufferedMutator(entityTableName);
+ entityTable = new EntityTable().getTableMutator(hbaseConf, conn);
}
/**
@@ -86,9 +86,6 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
String flowName, String flowVersion, long flowRunId, String appId,
TimelineEntities data) throws IOException {
- byte[] rowKeyPrefix = TimelineWriterUtils.getRowKeyPrefix(clusterId,
- userId, flowName, flowRunId, appId);
-
TimelineWriteResponse putStatus = new TimelineWriteResponse();
for (TimelineEntity te : data.getEntities()) {
@@ -96,19 +93,19 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
if (te == null) {
continue;
}
- // get row key
- byte[] row = TimelineWriterUtils.join(
- TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, rowKeyPrefix,
- Bytes.toBytes(te.getType()), Bytes.toBytes(te.getId()));
-
- storeInfo(row, te, flowVersion);
- storeEvents(row, te.getEvents());
- storeConfig(row, te.getConfigs());
- storeMetrics(row, te.getMetrics());
- storeRelations(row, te.getIsRelatedToEntities(),
- EntityColumnDetails.PREFIX_IS_RELATED_TO);
- storeRelations(row, te.getRelatesToEntities(),
- EntityColumnDetails.PREFIX_RELATES_TO);
+
+ byte[] rowKey =
+ EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId,
+ te);
+
+ storeInfo(rowKey, te, flowVersion);
+ storeEvents(rowKey, te.getEvents());
+ storeConfig(rowKey, te.getConfigs());
+ storeMetrics(rowKey, te.getMetrics());
+ storeRelations(rowKey, te.getIsRelatedToEntities(),
+ EntityColumnPrefix.IS_RELATED_TO);
+ storeRelations(rowKey, te.getRelatesToEntities(),
+ EntityColumnPrefix.RELATES_TO);
}
return putStatus;
@@ -119,10 +116,15 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
*/
private void storeRelations(byte[] rowKey,
Map<String, Set<String>> connectedEntities,
- EntityColumnDetails columnNamePrefix) throws IOException {
- for (Map.Entry<String, Set<String>> entry : connectedEntities.entrySet()) {
- columnNamePrefix.store(rowKey, entityTable, entry.getKey(),
- entry.getValue());
+ EntityColumnPrefix entityColumnPrefix) throws IOException {
+ for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities
+ .entrySet()) {
+ // id3?id4?id5
+ String compoundValue =
+ Separator.VALUES.joinEncoded(connectedEntity.getValue());
+
+ entityColumnPrefix.store(rowKey, entityTable, connectedEntity.getKey(),
+ null, compoundValue);
}
}
@@ -132,13 +134,13 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion)
throws IOException {
- EntityColumnDetails.ID.store(rowKey, entityTable, te.getId());
- EntityColumnDetails.TYPE.store(rowKey, entityTable, te.getType());
- EntityColumnDetails.CREATED_TIME.store(rowKey, entityTable,
+ EntityColumn.ID.store(rowKey, entityTable, null, te.getId());
+ EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType());
+ EntityColumn.CREATED_TIME.store(rowKey, entityTable, null,
te.getCreatedTime());
- EntityColumnDetails.MODIFIED_TIME.store(rowKey, entityTable,
+ EntityColumn.MODIFIED_TIME.store(rowKey, entityTable, null,
te.getModifiedTime());
- EntityColumnDetails.FLOW_VERSION.store(rowKey, entityTable, flowVersion);
+ EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion);
}
/**
@@ -150,8 +152,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
return;
}
for (Map.Entry<String, String> entry : config.entrySet()) {
- EntityColumnFamily.CONFIG.store(rowKey, entityTable,
- entry.getKey(), entry.getValue());
+ EntityColumnPrefix.CONFIG.store(rowKey, entityTable, entry.getKey(),
+ null, entry.getValue());
}
}
@@ -163,11 +165,12 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
throws IOException {
if (metrics != null) {
for (TimelineMetric metric : metrics) {
- String key = metric.getId();
+ String metricColumnQualifier = metric.getId();
Map<Long, Number> timeseries = metric.getValues();
- for (Map.Entry<Long, Number> entry : timeseries.entrySet()) {
- EntityColumnFamily.METRICS.store(rowKey, entityTable, key,
- entry.getKey(), entry.getValue());
+ for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
+ Long timestamp = timeseriesEntry.getKey();
+ EntityColumnPrefix.METRIC.store(rowKey, entityTable,
+ metricColumnQualifier, timestamp, timeseriesEntry.getValue());
}
}
}
@@ -181,19 +184,27 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
if (events != null) {
for (TimelineEvent event : events) {
if (event != null) {
- String id = event.getId();
- if (id != null) {
- byte[] idBytes = Bytes.toBytes(id);
+ String eventId = event.getId();
+ if (eventId != null) {
Map<String, Object> eventInfo = event.getInfo();
if (eventInfo != null) {
for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
- EntityColumnDetails.PREFIX_EVENTS.store(rowKey,
- entityTable, idBytes, info.getKey(), info.getValue());
- }
+ // eventId?infoKey
+ byte[] columnQualifierFirst =
+ Bytes.toBytes(Separator.VALUES.encode(eventId));
+ byte[] compoundColumnQualifierBytes =
+ Separator.VALUES.join(columnQualifierFirst,
+ Bytes.toBytes(info.getKey()));
+ // convert back to string to avoid additional API on store.
+ String compoundColumnQualifier =
+ Bytes.toString(compoundColumnQualifierBytes);
+ EntityColumnPrefix.METRIC.store(rowKey, entityTable,
+ compoundColumnQualifier, null, info.getValue());
+ } // for info: eventInfo
}
}
}
- }
+ } // event : events
}
}
@@ -204,8 +215,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
}
/**
- * close the hbase connections
- * The close APIs perform flushing and release any
+ * close the hbase connections The close APIs perform flushing and release any
* resources held
*/
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java
deleted file mode 100644
index 2a2db81..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class Range {
- private final int startIdx;
- private final int endIdx;
-
- /**
- * Defines a range from start index (inclusive) to end index (exclusive).
- *
- * @param start
- * Starting index position
- * @param end
- * Ending index position (exclusive)
- */
- public Range(int start, int end) {
- if (start < 0 || end < start) {
- throw new IllegalArgumentException(
- "Invalid range, required that: 0 <= start <= end; start=" + start
- + ", end=" + end);
- }
-
- this.startIdx = start;
- this.endIdx = end;
- }
-
- public int start() {
- return startIdx;
- }
-
- public int end() {
- return endIdx;
- }
-
- public int length() {
- return endIdx - startIdx;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java
deleted file mode 100644
index d95cbb2..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-/**
- * contains the constants used in the context of schema accesses for
- * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
- * information
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class TimelineEntitySchemaConstants {
-
- /** entity prefix */
- public static final String ENTITY_PREFIX =
- YarnConfiguration.TIMELINE_SERVICE_PREFIX
- + ".entity";
-
- /** config param name that specifies the entity table name */
- public static final String ENTITY_TABLE_NAME = ENTITY_PREFIX
- + ".table.name";
-
- /**
- * config param name that specifies the TTL for metrics column family in
- * entity table
- */
- public static final String ENTITY_TABLE_METRICS_TTL = ENTITY_PREFIX
- + ".table.metrics.ttl";
-
- /** default value for entity table name */
- public static final String DEFAULT_ENTITY_TABLE_NAME = "timelineservice.entity";
-
- /** in bytes default value for entity table name */
- static final byte[] DEFAULT_ENTITY_TABLE_NAME_BYTES = Bytes
- .toBytes(DEFAULT_ENTITY_TABLE_NAME);
-
- /** separator in row key */
- public static final String ROW_KEY_SEPARATOR = "!";
-
- /** byte representation of the separator in row key */
- static final byte[] ROW_KEY_SEPARATOR_BYTES = Bytes
- .toBytes(ROW_KEY_SEPARATOR);
-
- public static final byte ZERO_BYTES = 0;
-
- /** default TTL is 30 days for metrics timeseries */
- public static final int ENTITY_TABLE_METRICS_TTL_DEFAULT = 2592000;
-
- /** default max number of versions */
- public static final int ENTITY_TABLE_METRICS_MAX_VERSIONS_DEFAULT = 1000;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
index 820a6d1..a5cc2ab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
@@ -19,21 +19,6 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.IOException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
@@ -41,7 +26,18 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
/**
* This creates the schema for a hbase based backend for storing application
@@ -53,18 +49,6 @@ public class TimelineSchemaCreator {
final static String NAME = TimelineSchemaCreator.class.getSimpleName();
private static final Log LOG = LogFactory.getLog(TimelineSchemaCreator.class);
- final static byte[][] splits = { Bytes.toBytes("a"), Bytes.toBytes("ad"),
- Bytes.toBytes("an"), Bytes.toBytes("b"), Bytes.toBytes("ca"),
- Bytes.toBytes("cl"), Bytes.toBytes("d"), Bytes.toBytes("e"),
- Bytes.toBytes("f"), Bytes.toBytes("g"), Bytes.toBytes("h"),
- Bytes.toBytes("i"), Bytes.toBytes("j"), Bytes.toBytes("k"),
- Bytes.toBytes("l"), Bytes.toBytes("m"), Bytes.toBytes("n"),
- Bytes.toBytes("o"), Bytes.toBytes("q"), Bytes.toBytes("r"),
- Bytes.toBytes("s"), Bytes.toBytes("se"), Bytes.toBytes("t"),
- Bytes.toBytes("u"), Bytes.toBytes("v"), Bytes.toBytes("w"),
- Bytes.toBytes("x"), Bytes.toBytes("y"), Bytes.toBytes("z") };
-
- public static final String SPLIT_KEY_PREFIX_LENGTH = "4";
public static void main(String[] args) throws Exception {
@@ -79,13 +63,12 @@ public class TimelineSchemaCreator {
// Grab the entityTableName argument
String entityTableName = commandLine.getOptionValue("e");
if (StringUtils.isNotBlank(entityTableName)) {
- hbaseConf.set(TimelineEntitySchemaConstants.ENTITY_TABLE_NAME,
- entityTableName);
+ hbaseConf.set(EntityTable.TABLE_NAME_CONF_NAME, entityTableName);
}
- String entityTable_TTL_Metrics = commandLine.getOptionValue("m");
- if (StringUtils.isNotBlank(entityTable_TTL_Metrics)) {
- hbaseConf.set(TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL,
- entityTable_TTL_Metrics);
+ String entityTableTTLMetrics = commandLine.getOptionValue("m");
+ if (StringUtils.isNotBlank(entityTableTTLMetrics)) {
+ int metricsTTL = Integer.parseInt(entityTableTTLMetrics);
+ new EntityTable().setMetricsTTL(metricsTTL, hbaseConf);
}
createAllTables(hbaseConf);
}
@@ -136,7 +119,7 @@ public class TimelineSchemaCreator {
if (admin == null) {
throw new IOException("Cannot create table since admin is null");
}
- createTimelineEntityTable(admin, hbaseConf);
+ new EntityTable().createTable(admin, hbaseConf);
} finally {
if (conn != null) {
conn.close();
@@ -144,88 +127,5 @@ public class TimelineSchemaCreator {
}
}
- /**
- * Creates a table with column families info, config and metrics
- * info stores information about a timeline entity object
- * config stores configuration data of a timeline entity object
- * metrics stores the metrics of a timeline entity object
- *
- * Example entity table record:
- * <pre>
- *|------------------------------------------------------------|
- *| Row | Column Family | Column Family | Column Family|
- *| key | info | metrics | config |
- *|------------------------------------------------------------|
- *| userName! | id:entityId | metricName1: | configKey1: |
- *| clusterId! | | metricValue1 | configValue1 |
- *| flowId! | type:entityType| @timestamp1 | |
- *| flowRunId! | | | configKey2: |
- *| AppId! | created_time: | metricName1: | configValue2 |
- *| entityType!| 1392993084018 | metricValue2 | |
- *| entityId | | @timestamp2 | |
- *| | modified_time: | | |
- *| | 1392995081012 | metricName2: | |
- *| | | metricValue1 | |
- *| | r!relatesToKey:| @timestamp2 | |
- *| | id3!id4!id5 | | |
- *| | | | |
- *| | s!isRelatedToKey| | |
- *| | id7!id9!id5 | | |
- *| | | | |
- *| | e!eventKey: | | |
- *| | eventValue | | |
- *| | | | |
- *| | flowVersion: | | |
- *| | versionValue | | |
- *|------------------------------------------------------------|
- *</pre>
- * @param admin
- * @param hbaseConf
- * @throws IOException
- */
- public static void createTimelineEntityTable(Admin admin,
- Configuration hbaseConf) throws IOException {
-
- TableName table = TableName.valueOf(hbaseConf.get(
- TimelineEntitySchemaConstants.ENTITY_TABLE_NAME,
- TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME));
- if (admin.tableExists(table)) {
- // do not disable / delete existing table
- // similar to the approach taken by map-reduce jobs when
- // output directory exists
- throw new IOException("Table " + table.getNameAsString()
- + " already exists.");
- }
-
- HTableDescriptor entityTableDescp = new HTableDescriptor(table);
- HColumnDescriptor cf1 = new HColumnDescriptor(
- EntityColumnFamily.INFO.getInBytes());
- cf1.setBloomFilterType(BloomType.ROWCOL);
- entityTableDescp.addFamily(cf1);
-
- HColumnDescriptor cf2 = new HColumnDescriptor(
- EntityColumnFamily.CONFIG.getInBytes());
- cf2.setBloomFilterType(BloomType.ROWCOL);
- cf2.setBlockCacheEnabled(true);
- entityTableDescp.addFamily(cf2);
-
- HColumnDescriptor cf3 = new HColumnDescriptor(
- EntityColumnFamily.METRICS.getInBytes());
- entityTableDescp.addFamily(cf3);
- cf3.setBlockCacheEnabled(true);
- // always keep 1 version (the latest)
- cf3.setMinVersions(1);
- cf3.setMaxVersions(TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_MAX_VERSIONS_DEFAULT);
- cf3.setTimeToLive(hbaseConf.getInt(
- TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL,
- TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL_DEFAULT));
- entityTableDescp
- .setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
- entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
- SPLIT_KEY_PREFIX_LENGTH);
- admin.createTable(entityTableDescp, splits);
- LOG.info("Status of table creation for " + table.getNameAsString() + "="
- + admin.tableExists(table));
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
index 467bcec..494e8ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
@@ -15,17 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.service.Service;
/**
* This interface is for storing application timeline information.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java
deleted file mode 100644
index 113935e..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.Range;
-
-/**
- * bunch of utility functions used across TimelineWriter classes
- */
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class TimelineWriterUtils {
-
- /** empty bytes */
- public static final byte[] EMPTY_BYTES = new byte[0];
- private static final String SPACE = " ";
- private static final String UNDERSCORE = "_";
- private static final String EMPTY_STRING = "";
-
- /**
- * Returns a single byte array containing all of the individual component
- * arrays separated by the separator array.
- *
- * @param separator
- * @param components
- * @return byte array after joining the components
- */
- public static byte[] join(byte[] separator, byte[]... components) {
- if (components == null || components.length == 0) {
- return EMPTY_BYTES;
- }
-
- int finalSize = 0;
- if (separator != null) {
- finalSize = separator.length * (components.length - 1);
- }
- for (byte[] comp : components) {
- if (comp != null) {
- finalSize += comp.length;
- }
- }
-
- byte[] buf = new byte[finalSize];
- int offset = 0;
- for (int i = 0; i < components.length; i++) {
- if (components[i] != null) {
- System.arraycopy(components[i], 0, buf, offset, components[i].length);
- offset += components[i].length;
- if (i < (components.length - 1) && separator != null
- && separator.length > 0) {
- System.arraycopy(separator, 0, buf, offset, separator.length);
- offset += separator.length;
- }
- }
- }
- return buf;
- }
-
- /**
- * Splits the source array into multiple array segments using the given
- * separator, up to a maximum of count items. This will naturally produce
- * copied byte arrays for each of the split segments. To identify the split
- * ranges without the array copies, see
- * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
- *
- * @param source
- * @param separator
- * @return byte[] array after splitting the source
- */
- public static byte[][] split(byte[] source, byte[] separator) {
- return split(source, separator, -1);
- }
-
- /**
- * Splits the source array into multiple array segments using the given
- * separator, up to a maximum of count items. This will naturally produce
- * copied byte arrays for each of the split segments. To identify the split
- * ranges without the array copies, see
- * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
- *
- * @param source
- * @param separator
- * @param limit
- * @return byte[][] after splitting the input source
- */
- public static byte[][] split(byte[] source, byte[] separator, int limit) {
- List<Range> segments = splitRanges(source, separator, limit);
-
- byte[][] splits = new byte[segments.size()][];
- for (int i = 0; i < segments.size(); i++) {
- Range r = segments.get(i);
- byte[] tmp = new byte[r.length()];
- if (tmp.length > 0) {
- System.arraycopy(source, r.start(), tmp, 0, r.length());
- }
- splits[i] = tmp;
- }
- return splits;
- }
-
- /**
- * Returns a list of ranges identifying [start, end) -- closed, open --
- * positions within the source byte array that would be split using the
- * separator byte array.
- */
- public static List<Range> splitRanges(byte[] source, byte[] separator) {
- return splitRanges(source, separator, -1);
- }
-
- /**
- * Returns a list of ranges identifying [start, end) -- closed, open --
- * positions within the source byte array that would be split using the
- * separator byte array.
- * @param source the source data
- * @param separator the separator pattern to look for
- * @param limit the maximum number of splits to identify in the source
- */
- public static List<Range> splitRanges(byte[] source, byte[] separator, int limit) {
- List<Range> segments = new ArrayList<Range>();
- if ((source == null) || (separator == null)) {
- return segments;
- }
- int start = 0;
- itersource: for (int i = 0; i < source.length; i++) {
- for (int j = 0; j < separator.length; j++) {
- if (source[i + j] != separator[j]) {
- continue itersource;
- }
- }
- // all separator elements matched
- if (limit > 0 && segments.size() >= (limit-1)) {
- // everything else goes in one final segment
- break;
- }
-
- segments.add(new Range(start, i));
- start = i + separator.length;
- // i will be incremented again in outer for loop
- i += separator.length-1;
- }
- // add in remaining to a final range
- if (start <= source.length) {
- segments.add(new Range(start, source.length));
- }
- return segments;
- }
-
- /**
- * converts run id into it's inverse timestamp
- * @param flowRunId
- * @return inverted long
- */
- public static long encodeRunId(Long flowRunId) {
- return Long.MAX_VALUE - flowRunId;
- }
-
- /**
- * return a value from the Map as a String
- * @param key
- * @param values
- * @return value as a String or ""
- * @throws IOException
- */
- public static String getValueAsString(final byte[] key,
- final Map<byte[], byte[]> values) throws IOException {
- if( values == null ) {
- return EMPTY_STRING;
- }
- byte[] value = values.get(key);
- if (value != null) {
- return GenericObjectMapper.read(value).toString();
- } else {
- return EMPTY_STRING;
- }
- }
-
- /**
- * return a value from the Map as a long
- * @param key
- * @param values
- * @return value as Long or 0L
- * @throws IOException
- */
- public static long getValueAsLong(final byte[] key,
- final Map<byte[], byte[]> values) throws IOException {
- if (values == null) {
- return 0;
- }
- byte[] value = values.get(key);
- if (value != null) {
- Number val = (Number) GenericObjectMapper.read(value);
- return val.longValue();
- } else {
- return 0L;
- }
- }
-
- /**
- * concates the values from a Set<Strings> to return a single delimited string value
- * @param rowKeySeparator
- * @param values
- * @return Value from the set of strings as a string
- */
- public static String getValueAsString(String rowKeySeparator,
- Set<String> values) {
-
- if (values == null) {
- return EMPTY_STRING;
- }
- StringBuilder concatStrings = new StringBuilder();
- for (String value : values) {
- concatStrings.append(value);
- concatStrings.append(rowKeySeparator);
- }
- // remove the last separator
- if(concatStrings.length() > 1) {
- concatStrings.deleteCharAt(concatStrings.lastIndexOf(rowKeySeparator));
- }
- return concatStrings.toString();
- }
- /**
- * Constructs a row key prefix for the entity table
- * @param clusterId
- * @param userId
- * @param flowId
- * @param flowRunId
- * @param appId
- * @return byte array with the row key prefix
- */
- static byte[] getRowKeyPrefix(String clusterId, String userId, String flowId,
- Long flowRunId, String appId) {
- return TimelineWriterUtils.join(
- TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES,
- Bytes.toBytes(cleanse(userId)), Bytes.toBytes(cleanse(clusterId)),
- Bytes.toBytes(cleanse(flowId)),
- Bytes.toBytes(TimelineWriterUtils.encodeRunId(flowRunId)),
- Bytes.toBytes(cleanse(appId)));
- }
-
- /**
- * Takes a string token to be used as a key or qualifier and
- * cleanses out reserved tokens.
- * This operation is not symmetrical.
- * Logic is to replace all spaces and separator chars in input with
- * underscores.
- *
- * @param token token to cleanse.
- * @return String with no spaces and no separator chars
- */
- public static String cleanse(String token) {
- if (token == null || token.length() == 0) {
- return token;
- }
-
- String cleansed = token.replaceAll(SPACE, UNDERSCORE);
- cleansed = cleansed.replaceAll(
- TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR, UNDERSCORE);
-
- return cleansed;
- }
-
- /**
- * stores the info to the table in hbase
- *
- * @param rowKey
- * @param table
- * @param columnFamily
- * @param columnPrefix
- * @param columnQualifier
- * @param inputValue
- * @param cellTimeStamp
- * @throws IOException
- */
- public static void store(byte[] rowKey, BufferedMutator table, byte[] columnFamily,
- byte[] columnPrefix, byte[] columnQualifier, Object inputValue,
- Long cellTimeStamp) throws IOException {
- if ((rowKey == null) || (table == null) || (columnFamily == null)
- || (columnQualifier == null) || (inputValue == null)) {
- return;
- }
-
- Put p = null;
- if (cellTimeStamp == null) {
- if (columnPrefix != null) {
- // store with prefix
- p = new Put(rowKey);
- p.addColumn(
- columnFamily,
- join(TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES,
- columnPrefix, columnQualifier), GenericObjectMapper
- .write(inputValue));
- } else {
- // store without prefix
- p = new Put(rowKey);
- p.addColumn(columnFamily, columnQualifier,
- GenericObjectMapper.write(inputValue));
- }
- } else {
- // store with cell timestamp
- Cell cell = CellUtil.createCell(rowKey, columnFamily, columnQualifier,
- // set the cell timestamp
- cellTimeStamp,
- // KeyValue Type minimum
- TimelineEntitySchemaConstants.ZERO_BYTES,
- GenericObjectMapper.write(inputValue));
- p = new Put(rowKey);
- p.add(cell);
- }
- if (p != null) {
- table.mutate(p);
- }
-
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
new file mode 100644
index 0000000..e8d8b5c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+
+/**
+ * Implements behavior common to tables used in the timeline service storage.
+ *
+ * @param <T> reference to the table instance class itself for type safety.
+ */
+public abstract class BaseTable<T> {
+
+ /**
+ * Name of config variable that is used to point to this table
+ */
+ private final String tableNameConfName;
+
+ /**
+ * Unless the configuration overrides, this will be the default name for the
+ * table when it is created.
+ */
+ private final String defaultTableName;
+
+ /**
+ * @param tableNameConfName name of config variable that is used to point to
+ * this table.
+ */
+ protected BaseTable(String tableNameConfName, String defaultTableName) {
+ this.tableNameConfName = tableNameConfName;
+ this.defaultTableName = defaultTableName;
+ }
+
+ /**
+ * Used to create a type-safe mutator for this table.
+ *
+ * @param hbaseConf used to read table name
+ * @param conn used to create a table from.
+ * @return a type safe {@link BufferedMutator} for the entity table.
+ * @throws IOException
+ */
+ public TypedBufferedMutator<T> getTableMutator(Configuration hbaseConf,
+ Connection conn) throws IOException {
+
+ TableName tableName = this.getTableName(hbaseConf);
+
+ // Plain buffered mutator
+ BufferedMutator bufferedMutator = conn.getBufferedMutator(tableName);
+
+ // Now make this thing type safe.
+ // This is how service initialization should hang on to this variable, with
+ // the proper type
+ TypedBufferedMutator<T> table =
+ new BufferedMutatorDelegator<T>(bufferedMutator);
+
+ return table;
+ }
+
+ /**
+ * @param hbaseConf used to read settings that override defaults
+ * @param conn used to create table from
+ * @param scan that specifies what you want to read from this table.
+ * @return scanner for the table.
+ * @throws IOException
+ */
+ public ResultScanner getResultScanner(Configuration hbaseConf,
+ Connection conn, Scan scan) throws IOException {
+ Table table = conn.getTable(getTableName(hbaseConf));
+ return table.getScanner(scan);
+ }
+
+ /**
+ * Get the table name for this table.
+ *
+ * @param hbaseConf
+ */
+ public TableName getTableName(Configuration hbaseConf) {
+ TableName table =
+ TableName.valueOf(hbaseConf.get(tableNameConfName, defaultTableName));
+ return table;
+
+ }
+
+ /**
+ * Used to create the table in HBase. Should be called only once (per HBase
+ * instance).
+ *
+ * @param admin
+ * @param hbaseConf
+ */
+ public abstract void createTable(Admin admin, Configuration hbaseConf)
+ throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java
new file mode 100644
index 0000000..fe8f9c6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Mutation;
+
+/**
+ * To be used to wrap an actual {@link BufferedMutator} in a type safe manner
+ *
+ * @param <T> The class referring to the table to be written to.
+ */
+class BufferedMutatorDelegator<T> implements TypedBufferedMutator<T> {
+
+ private final BufferedMutator bufferedMutator;
+
+ /**
+ * @param bufferedMutator the mutator to be wrapped for delegation. Shall not
+ * be null.
+ */
+ public BufferedMutatorDelegator(BufferedMutator bufferedMutator) {
+ this.bufferedMutator = bufferedMutator;
+ }
+
+ public TableName getName() {
+ return bufferedMutator.getName();
+ }
+
+ public Configuration getConfiguration() {
+ return bufferedMutator.getConfiguration();
+ }
+
+ public void mutate(Mutation mutation) throws IOException {
+ bufferedMutator.mutate(mutation);
+ }
+
+ public void mutate(List<? extends Mutation> mutations) throws IOException {
+ bufferedMutator.mutate(mutations);
+ }
+
+ public void close() throws IOException {
+ bufferedMutator.close();
+ }
+
+ public void flush() throws IOException {
+ bufferedMutator.flush();
+ }
+
+ public long getWriteBufferSize() {
+ return bufferedMutator.getWriteBufferSize();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
new file mode 100644
index 0000000..3397d62
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Result;
+
+/**
+ * A Column represents the way to store a fully qualified column in a specific
+ * table.
+ */
+public interface Column<T> {
+
+ /**
+ * Sends a Mutation to the table. The mutations will be buffered and sent over
+ * the wire as part of a batch.
+ *
+ * @param rowKey identifying the row to write. Nothing gets written when null.
+ * @param tableMutator used to modify the underlying HBase table. Caller is
+ * responsible to pass a mutator for the table that actually has this
+ * column.
+ * @param timestamp version timestamp. When null the server timestamp will be
+ * used.
+ * @param inputValue the value to write to the rowKey and column qualifier.
+ * Nothing gets written when null.
+ * @throws IOException
+ */
+ public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
+ Long timestamp, Object inputValue) throws IOException;
+
+ /**
+ * Get the latest version of this specified column. Note: this call clones the
+ * value content of the hosting {@link Cell}.
+ *
+ * @param result Cannot be null
+ * @return result object (can be cast to whatever object was written to), or
+ * null when result doesn't contain this column.
+ * @throws IOException
+ */
+ public Object readResult(Result result) throws IOException;
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnFamily.java
new file mode 100644
index 0000000..c84c016
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnFamily.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+/**
+ * Type safe column family.
+ *
+ * @param <T> refers to the table for which this column family is used for.
+ */
+public interface ColumnFamily<T> {
+
+ /**
+ * Keep a local copy if you need to avoid overhead of repeated cloning.
+ *
+ * @return a clone of the byte representation of the column family.
+ */
+ public byte[] getBytes();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
new file mode 100644
index 0000000..6a204dc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+
+/**
+ * This class is meant to be used only by explicit Columns, and not directly to
+ * write by clients.
+ *
+ * @param <T> refers to the table.
+ */
+public class ColumnHelper<T> {
+
+ private final ColumnFamily<T> columnFamily;
+
+ /**
+ * Local copy of bytes representation of columnFamily so that we can avoid
+ * cloning a new copy over and over.
+ */
+ private final byte[] columnFamilyBytes;
+
+ public ColumnHelper(ColumnFamily<T> columnFamily) {
+ this.columnFamily = columnFamily;
+ columnFamilyBytes = columnFamily.getBytes();
+ }
+
+ /**
+ * Sends a Mutation to the table. The mutations will be buffered and sent over
+ * the wire as part of a batch.
+ *
+ * @param rowKey identifying the row to write. Nothing gets written when null.
+ * @param tableMutator used to modify the underlying HBase table
+ * @param columnQualifier column qualifier. Nothing gets written when null.
+ * @param timestamp version timestamp. When null the server timestamp will be
+ * used.
+ * @param inputValue the value to write to the rowKey and column qualifier.
+ * Nothing gets written when null.
+ * @throws IOException
+ */
+ public void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator,
+ byte[] columnQualifier, Long timestamp, Object inputValue)
+ throws IOException {
+ if ((rowKey == null) || (columnQualifier == null) || (inputValue == null)) {
+ return;
+ }
+ Put p = new Put(rowKey);
+
+ if (timestamp == null) {
+ p.addColumn(columnFamilyBytes, columnQualifier,
+ GenericObjectMapper.write(inputValue));
+ } else {
+ p.addColumn(columnFamilyBytes, columnQualifier, timestamp,
+ GenericObjectMapper.write(inputValue));
+ }
+ tableMutator.mutate(p);
+ }
+
+ /**
+ * @return the column family for this column implementation.
+ */
+ public ColumnFamily<T> getColumnFamily() {
+ return columnFamily;
+ }
+
+ /**
+ * Get the latest version of this specified column. Note: this call clones the
+ * value content of the hosting {@link Cell}.
+ *
+ * @param result from which to read the value. Cannot be null
+ * @param columnQualifierBytes referring to the column to be read.
+ * @return latest version of the specified column of whichever object was
+ * written.
+ * @throws IOException
+ */
+ public Object readResult(Result result, byte[] columnQualifierBytes)
+ throws IOException {
+ if (result == null || columnQualifierBytes == null) {
+ return null;
+ }
+
+ // Would have preferred to be able to use getValueAsByteBuffer and get a
+ // ByteBuffer to avoid copy, but GenericObjectMapper doesn't seem to like
+ // that.
+ byte[] value = result.getValue(columnFamilyBytes, columnQualifierBytes);
+ return GenericObjectMapper.read(value);
+ }
+
+ /**
+ * @param result from which to reads timeseries data
+ * @param columnPrefixBytes optional prefix to limit columns. If null all
+ * columns are returned.
+ * @return the cell values at each respective time in for form
+ * {idA={timestamp1->value1}, idA={timestamp2->value2},
+ * idB={timestamp3->value3}, idC={timestamp1->value4}}
+ * @throws IOException
+ */
+ public NavigableMap<String, NavigableMap<Long, Number>> readTimeseriesResults(
+ Result result, byte[] columnPrefixBytes) throws IOException {
+
+ NavigableMap<String, NavigableMap<Long, Number>> results =
+ new TreeMap<String, NavigableMap<Long, Number>>();
+
+ if (result != null) {
+ NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> resultMap =
+ result.getMap();
+
+ NavigableMap<byte[], NavigableMap<Long, byte[]>> columnCellMap =
+ resultMap.get(columnFamilyBytes);
+
+ // could be that there is no such column family.
+ if (columnCellMap != null) {
+ for (Entry<byte[], NavigableMap<Long, byte[]>> entry : columnCellMap
+ .entrySet()) {
+ String columnName = null;
+ if (columnPrefixBytes == null) {
+ // Decode the spaces we encoded in the column name.
+ columnName = Separator.decode(entry.getKey(), Separator.SPACE);
+ } else {
+ // A non-null prefix means columns are actually of the form
+ // prefix!columnNameRemainder
+ byte[][] columnNameParts =
+ Separator.QUALIFIERS.split(entry.getKey(), 2);
+ byte[] actualColumnPrefixBytes = columnNameParts[0];
+ if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
+ && columnNameParts.length == 2) {
+ // This is the prefix that we want
+ columnName = Separator.decode(columnNameParts[1]);
+ }
+ }
+
+ // If this column has the prefix we want
+ if (columnName != null) {
+ NavigableMap<Long, Number> cellResults =
+ new TreeMap<Long, Number>();
+ NavigableMap<Long, byte[]> cells = entry.getValue();
+ if (cells != null) {
+ for (Entry<Long, byte[]> cell : cells.entrySet()) {
+ Number value =
+ (Number) GenericObjectMapper.read(cell.getValue());
+ cellResults.put(cell.getKey(), value);
+ }
+ }
+ results.put(columnName, cellResults);
+ }
+ } // for entry : columnCellMap
+ } // if columnCellMap != null
+ } // if result != null
+ return results;
+ }
+
+ /**
+ * @param result from which to read columns
+ * @param columnPrefixBytes optional prefix to limit columns. If null all
+ * columns are returned.
+ * @return the latest values of columns in the column family.
+ * @throws IOException
+ */
+ public Map<String, Object> readResults(Result result, byte[] columnPrefixBytes)
+ throws IOException {
+ Map<String, Object> results = new HashMap<String, Object>();
+
+ if (result != null) {
+ Map<byte[], byte[]> columns = result.getFamilyMap(columnFamilyBytes);
+ for (Entry<byte[], byte[]> entry : columns.entrySet()) {
+ if (entry.getKey() != null && entry.getKey().length > 0) {
+
+ String columnName = null;
+ if (columnPrefixBytes == null) {
+ // Decode the spaces we encoded in the column name.
+ columnName = Separator.decode(entry.getKey(), Separator.SPACE);
+ } else {
+ // A non-null prefix means columns are actually of the form
+ // prefix!columnNameRemainder
+ byte[][] columnNameParts =
+ Separator.QUALIFIERS.split(entry.getKey(), 2);
+ byte[] actualColumnPrefixBytes = columnNameParts[0];
+ if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
+ && columnNameParts.length == 2) {
+ // This is the prefix that we want
+ columnName = Separator.decode(columnNameParts[1]);
+ }
+ }
+
+ // If this column has the prefix we want
+ if (columnName != null) {
+ Object value = GenericObjectMapper.read(entry.getValue());
+ results.put(columnName, value);
+ }
+ }
+ } // for entry
+ }
+ return results;
+ }
+
+ /**
+ * @param columnPrefixBytes The byte representation for the column prefix.
+ * Should not contain {@link Separator#QUALIFIERS}.
+ * @param qualifier for the remainder of the column. Any
+ * {@link Separator#QUALIFIERS} will be encoded in the qualifier.
+ * @return fully sanitized column qualifier that is a combination of prefix
+ * and qualifier. If prefix is null, the result is simply the encoded
+ * qualifier without any separator.
+ */
+ public static byte[] getColumnQualifier(byte[] columnPrefixBytes,
+ String qualifier) {
+
+ // We don't want column names to have spaces
+ byte[] encodedQualifier = Bytes.toBytes(Separator.SPACE.encode(qualifier));
+ if (columnPrefixBytes == null) {
+ return encodedQualifier;
+ }
+
+ // Convert qualifier to lower case, strip of separators and tag on column
+ // prefix.
+ byte[] columnQualifier =
+ Separator.QUALIFIERS.join(columnPrefixBytes, encodedQualifier);
+ return columnQualifier;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
new file mode 100644
index 0000000..2eedea0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Result;
+
+/**
+ * Used to represent a partially qualified column, where the actual column name
+ * will be composed of a prefix and the remainder of the column qualifier. The
+ * prefix can be null, in which case the column qualifier will be completely
+ * determined when the values are stored.
+ */
+public interface ColumnPrefix<T> {
+
+ /**
+ * Sends a Mutation to the table. The mutations will be buffered and sent over
+ * the wire as part of a batch.
+ *
+ * @param rowKey identifying the row to write. Nothing gets written when null.
+ * @param tableMutator used to modify the underlying HBase table. Caller is
+ * responsible to pass a mutator for the table that actually has this
+ * column.
+ * @param qualifier column qualifier. Nothing gets written when null.
+ * @param timestamp version timestamp. When null the server timestamp will be
+ * used.
+ * @param inputValue the value to write to the rowKey and column qualifier.
+ * Nothing gets written when null.
+ * @throws IOException
+ */
+ public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
+ String qualifier, Long timestamp, Object inputValue) throws IOException;
+
+ /**
+ * Get the latest version of this specified column. Note: this call clones the
+ * value content of the hosting {@link Cell}.
+ *
+ * @param result Cannot be null
+ * @param qualifier column qualifier. Nothing gets read when null.
+ * @return result object (can be cast to whatever object was written to) or
+ * null when specified column qualifier for this prefix doesn't exist
+ * in the result.
+ * @throws IOException
+ */
+ public Object readResult(Result result, String qualifier) throws IOException;
+
+ /**
+ * @param resultfrom which to read columns
+ * @return the latest values of columns in the column family with this prefix
+ * (or all of them if the prefix value is null).
+ * @throws IOException
+ */
+ public Map<String, Object> readResults(Result result) throws IOException;
+
+ /**
+ * @param result from which to reads timeseries data
+ * @return the cell values at each respective time in for form
+ * {idA={timestamp1->value1}, idA={timestamp2->value2},
+ * idB={timestamp3->value3}, idC={timestamp1->value4}}
+ * @throws IOException
+ */
+ public NavigableMap<String, NavigableMap<Long, Number>> readTimeseriesResults(
+ Result result) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9137aeae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Range.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Range.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Range.java
new file mode 100644
index 0000000..2cb6c08
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Range.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class Range {
+ private final int startIdx;
+ private final int endIdx;
+
+ /**
+ * Defines a range from start index (inclusive) to end index (exclusive).
+ *
+ * @param start
+ * Starting index position
+ * @param end
+ * Ending index position (exclusive)
+ */
+ public Range(int start, int end) {
+ if (start < 0 || end < start) {
+ throw new IllegalArgumentException(
+ "Invalid range, required that: 0 <= start <= end; start=" + start
+ + ", end=" + end);
+ }
+
+ this.startIdx = start;
+ this.endIdx = end;
+ }
+
+ public int start() {
+ return startIdx;
+ }
+
+ public int end() {
+ return endIdx;
+ }
+
+ public int length() {
+ return endIdx - startIdx;
+ }
+}
\ No newline at end of file