You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by va...@apache.org on 2017/08/03 06:57:39 UTC
[02/50] [abbrv] hadoop git commit: YARN-5378. Accommodate
app-id->cluster mapping (Sangjin Lee via Varun Saxena)
YARN-5378. Accommodate app-id->cluster mapping (Sangjin Lee via Varun Saxena)
(cherry picked from commit 6baea680ba6e5df6f254ced086d6defa64fb99f0)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/44216a46
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/44216a46
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/44216a46
Branch: refs/heads/YARN-5355_branch2
Commit: 44216a4660fbdef7818e5de07d238510db5dec22
Parents: 7d883f4
Author: Varun Saxena <va...@apache.org>
Authored: Tue Jan 17 20:05:47 2017 +0530
Committer: Varun Saxena <va...@apache.org>
Committed: Tue Apr 25 23:14:28 2017 +0530
----------------------------------------------------------------------
.../storage/HBaseTimelineWriterImpl.java | 20 +-
.../storage/apptoflow/AppToFlowColumn.java | 148 -------------
.../apptoflow/AppToFlowColumnPrefix.java | 206 +++++++++++++++++++
.../storage/apptoflow/AppToFlowRowKey.java | 101 +--------
.../storage/apptoflow/AppToFlowTable.java | 21 +-
.../storage/common/ColumnHelper.java | 5 +-
.../reader/AbstractTimelineStorageReader.java | 39 ++--
.../storage/common/TestRowKeys.java | 4 +-
8 files changed, 271 insertions(+), 273 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/44216a46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index c1c2a5e..dfd63bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
@@ -172,9 +172,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
FlowRunRowKey flowRunRowKey =
new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
if (event != null) {
- AppToFlowRowKey appToFlowRowKey =
- new AppToFlowRowKey(clusterId, appId);
- onApplicationCreated(flowRunRowKey, appToFlowRowKey, appId, userId,
+ onApplicationCreated(flowRunRowKey, clusterId, appId, userId,
flowVersion, te, event.getTimestamp());
}
// if it's an application entity, store metrics
@@ -193,18 +191,22 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
}
private void onApplicationCreated(FlowRunRowKey flowRunRowKey,
- AppToFlowRowKey appToFlowRowKey, String appId, String userId,
- String flowVersion, TimelineEntity te, long appCreatedTimeStamp)
+ String clusterId, String appId, String userId, String flowVersion,
+ TimelineEntity te, long appCreatedTimeStamp)
throws IOException {
String flowName = flowRunRowKey.getFlowName();
Long flowRunId = flowRunRowKey.getFlowRunId();
// store in App to flow table
+ AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey(appId);
byte[] rowKey = appToFlowRowKey.getRowKey();
- AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName);
- AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId);
- AppToFlowColumn.USER_ID.store(rowKey, appToFlowTable, null, userId);
+ AppToFlowColumnPrefix.FLOW_NAME.store(rowKey, appToFlowTable, clusterId,
+ null, flowName);
+ AppToFlowColumnPrefix.FLOW_RUN_ID.store(rowKey, appToFlowTable, clusterId,
+ null, flowRunId);
+ AppToFlowColumnPrefix.USER_ID.store(rowKey, appToFlowTable, clusterId, null,
+ userId);
// store in flow run table
storeAppCreatedInFlowRunTable(flowRunRowKey, appId, te);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/44216a46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
deleted file mode 100644
index ff61633..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
-
-
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
-
-import java.io.IOException;
-
-/**
- * Identifies fully qualified columns for the {@link AppToFlowTable}.
- */
-public enum AppToFlowColumn implements Column<AppToFlowTable> {
-
- /**
- * The flow ID.
- */
- FLOW_ID(AppToFlowColumnFamily.MAPPING, "flow_id"),
-
- /**
- * The flow run ID.
- */
- FLOW_RUN_ID(AppToFlowColumnFamily.MAPPING, "flow_run_id"),
-
- /**
- * The user.
- */
- USER_ID(AppToFlowColumnFamily.MAPPING, "user_id");
-
- private final ColumnHelper<AppToFlowTable> column;
- private final ColumnFamily<AppToFlowTable> columnFamily;
- private final String columnQualifier;
- private final byte[] columnQualifierBytes;
-
- AppToFlowColumn(ColumnFamily<AppToFlowTable> columnFamily,
- String columnQualifier) {
- this.columnFamily = columnFamily;
- this.columnQualifier = columnQualifier;
- // Future-proof by ensuring the right column prefix hygiene.
- this.columnQualifierBytes =
- Bytes.toBytes(Separator.SPACE.encode(columnQualifier));
- this.column = new ColumnHelper<AppToFlowTable>(columnFamily);
- }
-
- /**
- * @return the column name value
- */
- private String getColumnQualifier() {
- return columnQualifier;
- }
-
- @Override
- public byte[] getColumnQualifierBytes() {
- return columnQualifierBytes.clone();
- }
-
- public void store(byte[] rowKey,
- TypedBufferedMutator<AppToFlowTable> tableMutator, Long timestamp,
- Object inputValue, Attribute... attributes) throws IOException {
- column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
- inputValue, attributes);
- }
-
- @Override
- public byte[] getColumnFamilyBytes() {
- return columnFamily.getBytes();
- }
-
- @Override
- public ValueConverter getValueConverter() {
- return column.getValueConverter();
- }
-
- public Object readResult(Result result) throws IOException {
- return column.readResult(result, columnQualifierBytes);
- }
-
- /**
- * Retrieve an {@link AppToFlowColumn} given a name, or null if there is no
- * match. The following holds true: {@code columnFor(x) == columnFor(y)} if
- * and only if {@code x.equals(y)} or {@code (x == y == null)}
- *
- * @param columnQualifier Name of the column to retrieve
- * @return the corresponding {@link AppToFlowColumn} or null
- */
- public static final AppToFlowColumn columnFor(String columnQualifier) {
-
- // Match column based on value, assume column family matches.
- for (AppToFlowColumn ec : AppToFlowColumn.values()) {
- // Find a match based only on name.
- if (ec.getColumnQualifier().equals(columnQualifier)) {
- return ec;
- }
- }
-
- // Default to null
- return null;
- }
-
- /**
- * Retrieve an {@link AppToFlowColumn} given a name, or null if there is no
- * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
- * if and only if {@code a.equals(b) & x.equals(y)} or
- * {@code (x == y == null)}
- *
- * @param columnFamily The columnFamily for which to retrieve the column.
- * @param name Name of the column to retrieve
- * @return the corresponding {@link AppToFlowColumn} or null if both arguments
- * don't match.
- */
- public static final AppToFlowColumn columnFor(
- AppToFlowColumnFamily columnFamily, String name) {
-
- for (AppToFlowColumn ec : AppToFlowColumn.values()) {
- // Find a match based column family and on name.
- if (ec.columnFamily.equals(columnFamily)
- && ec.getColumnQualifier().equals(name)) {
- return ec;
- }
- }
-
- // Default to null
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/44216a46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnPrefix.java
new file mode 100644
index 0000000..f1e4495
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnPrefix.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+
+/**
+ * Identifies partially qualified columns for the app-to-flow table.
+ */
+public enum AppToFlowColumnPrefix implements ColumnPrefix<AppToFlowTable> {
+
+ /**
+ * The flow name.
+ */
+ FLOW_NAME(AppToFlowColumnFamily.MAPPING, "flow_name"),
+
+ /**
+ * The flow run ID.
+ */
+ FLOW_RUN_ID(AppToFlowColumnFamily.MAPPING, "flow_run_id"),
+
+ /**
+ * The user.
+ */
+ USER_ID(AppToFlowColumnFamily.MAPPING, "user_id");
+
+ private final ColumnHelper<AppToFlowTable> column;
+ private final ColumnFamily<AppToFlowTable> columnFamily;
+ private final String columnPrefix;
+ private final byte[] columnPrefixBytes;
+
+ private AppToFlowColumnPrefix(ColumnFamily<AppToFlowTable> columnFamily,
+ String columnPrefix) {
+ this.columnFamily = columnFamily;
+ this.columnPrefix = columnPrefix;
+ if (columnPrefix == null) {
+ this.columnPrefixBytes = null;
+ } else {
+ // Future-proof by ensuring the right column prefix hygiene.
+ this.columnPrefixBytes =
+ Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
+ }
+ this.column = new ColumnHelper<AppToFlowTable>(columnFamily);
+ }
+
+ @Override
+ public byte[] getColumnPrefixBytes(String qualifierPrefix) {
+ return ColumnHelper.getColumnQualifier(
+ columnPrefixBytes, qualifierPrefix);
+ }
+
+ @Override
+ public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
+ return ColumnHelper.getColumnQualifier(
+ columnPrefixBytes, qualifierPrefix);
+ }
+
+ @Override
+ public byte[] getColumnFamilyBytes() {
+ return columnFamily.getBytes();
+ }
+
+ @Override
+ public void store(byte[] rowKey,
+ TypedBufferedMutator<AppToFlowTable> tableMutator, byte[] qualifier,
+ Long timestamp, Object inputValue, Attribute... attributes)
+ throws IOException {
+
+ // Null check
+ if (qualifier == null) {
+ throw new IOException("Cannot store column with null qualifier in "
+ + tableMutator.getName().getNameAsString());
+ }
+
+ byte[] columnQualifier = getColumnPrefixBytes(qualifier);
+
+ column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+ attributes);
+ }
+
+ @Override
+ public void store(byte[] rowKey,
+ TypedBufferedMutator<AppToFlowTable> tableMutator, String qualifier,
+ Long timestamp, Object inputValue, Attribute... attributes)
+ throws IOException {
+
+ // Null check
+ if (qualifier == null) {
+ throw new IOException("Cannot store column with null qualifier in "
+ + tableMutator.getName().getNameAsString());
+ }
+
+ byte[] columnQualifier = getColumnPrefixBytes(qualifier);
+
+ column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+ attributes);
+ }
+
+ @Override
+ public ValueConverter getValueConverter() {
+ return column.getValueConverter();
+ }
+
+ @Override
+ public Object readResult(Result result, String qualifier) throws IOException {
+ byte[] columnQualifier =
+ ColumnHelper.getColumnQualifier(columnPrefixBytes, qualifier);
+ return column.readResult(result, columnQualifier);
+ }
+
+ @Override
+ public <K> Map<K, Object> readResults(Result result,
+ KeyConverter<K> keyConverter)
+ throws IOException {
+ return column.readResults(result, columnPrefixBytes, keyConverter);
+ }
+
+ @Override
+ public <K, V> NavigableMap<K, NavigableMap<Long, V>>
+ readResultsWithTimestamps(Result result,
+ KeyConverter<K> keyConverter) throws IOException {
+ return column.readResultsWithTimestamps(result, columnPrefixBytes,
+ keyConverter);
+ }
+
+ /**
+ * Retrieve an {@link AppToFlowColumnPrefix} given a name, or null if there
+ * is no match. The following holds true: {@code columnFor(x) == columnFor(y)}
+ * if and only if {@code x.equals(y)} or {@code (x == y == null)}
+ *
+ * @param columnPrefix Name of the column to retrieve
+ * @return the corresponding {@link AppToFlowColumnPrefix} or null
+ */
+ public static final AppToFlowColumnPrefix columnFor(String columnPrefix) {
+
+ // Match column based on value, assume column family matches.
+ for (AppToFlowColumnPrefix afcp : AppToFlowColumnPrefix.values()) {
+ // Find a match based only on name.
+ if (afcp.columnPrefix.equals(columnPrefix)) {
+ return afcp;
+ }
+ }
+
+ // Default to null
+ return null;
+ }
+
+ /**
+ * Retrieve an {@link AppToFlowColumnPrefix} given a name, or null if there
+ * is no match. The following holds true:
+ * {@code columnFor(a,x) == columnFor(b,y)} if and only if
+ * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)}
+ *
+ * @param columnFamily The columnFamily for which to retrieve the column.
+ * @param columnPrefix Name of the column to retrieve
+ * @return the corresponding {@link AppToFlowColumnPrefix} or null if both
+ * arguments don't match.
+ */
+ public static final AppToFlowColumnPrefix columnFor(
+ AppToFlowColumnFamily columnFamily, String columnPrefix) {
+
+ // TODO: needs unit test to confirm and need to update javadoc to explain
+ // null prefix case.
+
+ for (AppToFlowColumnPrefix afcp : AppToFlowColumnPrefix.values()) {
+ // Find a match based column family and on name.
+ if (afcp.columnFamily.equals(columnFamily)
+ && (((columnPrefix == null) && (afcp.columnPrefix == null)) ||
+ (afcp.columnPrefix.equals(columnPrefix)))) {
+ return afcp;
+ }
+ }
+
+ // Default to null
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/44216a46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
index 8df4407..146c475 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
@@ -17,41 +17,32 @@
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
/**
- * Represents a rowkey for the app_flow table.
+ * Represents a row key for the app_flow table, which is the app id.
*/
public class AppToFlowRowKey {
- private final String clusterId;
private final String appId;
- private final KeyConverter<AppToFlowRowKey> appToFlowRowKeyConverter =
- new AppToFlowRowKeyConverter();
+ private final KeyConverter<String> appIdKeyConverter =
+ new AppIdKeyConverter();
- public AppToFlowRowKey(String clusterId, String appId) {
- this.clusterId = clusterId;
+ public AppToFlowRowKey(String appId) {
this.appId = appId;
}
- public String getClusterId() {
- return clusterId;
- }
-
public String getAppId() {
return appId;
}
/**
- * Constructs a row key prefix for the app_flow table as follows:
- * {@code clusterId!AppId}.
+ * Constructs a row key prefix for the app_flow table.
*
* @return byte array with the row key
*/
public byte[] getRowKey() {
- return appToFlowRowKeyConverter.encode(this);
+ return appIdKeyConverter.encode(appId);
}
/**
@@ -61,83 +52,7 @@ public class AppToFlowRowKey {
* @return an <cite>AppToFlowRowKey</cite> object.
*/
public static AppToFlowRowKey parseRowKey(byte[] rowKey) {
- return new AppToFlowRowKeyConverter().decode(rowKey);
- }
-
- /**
- * Encodes and decodes row key for app_flow table. The row key is of the form
- * clusterId!appId. clusterId is a string and appId is encoded/decoded using
- * {@link AppIdKeyConverter}.
- * <p>
- */
- final private static class AppToFlowRowKeyConverter implements
- KeyConverter<AppToFlowRowKey> {
-
- private final KeyConverter<String> appIDKeyConverter =
- new AppIdKeyConverter();
-
- /**
- * Intended for use in AppToFlowRowKey only.
- */
- private AppToFlowRowKeyConverter() {
- }
-
-
- /**
- * App to flow row key is of the form clusterId!appId with the 2 segments
- * separated by !. The sizes below indicate sizes of both of these segments
- * in sequence. clusterId is a string. appId is represented as 12 bytes w.
- * cluster Timestamp part of appid taking 8 bytes(long) and seq id taking 4
- * bytes(int). Strings are variable in size (i.e. end whenever separator is
- * encountered). This is used while decoding and helps in determining where
- * to split.
- */
- private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
- Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT };
-
- /*
- * (non-Javadoc)
- *
- * Encodes AppToFlowRowKey object into a byte array with each
- * component/field in AppToFlowRowKey separated by Separator#QUALIFIERS.
- * This leads to an app to flow table row key of the form clusterId!appId
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common
- * .KeyConverter#encode(java.lang.Object)
- */
- @Override
- public byte[] encode(AppToFlowRowKey rowKey) {
- byte[] first =
- Separator.encode(rowKey.getClusterId(), Separator.SPACE,
- Separator.TAB, Separator.QUALIFIERS);
- byte[] second = appIDKeyConverter.encode(rowKey.getAppId());
- return Separator.QUALIFIERS.join(first, second);
- }
-
- /*
- * (non-Javadoc)
- *
- * Decodes an app to flow row key of the form clusterId!appId represented
- * in byte format and converts it into an AppToFlowRowKey object.
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common
- * .KeyConverter#decode(byte[])
- */
- @Override
- public AppToFlowRowKey decode(byte[] rowKey) {
- byte[][] rowKeyComponents =
- Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
- if (rowKeyComponents.length != 2) {
- throw new IllegalArgumentException("the row key is not valid for "
- + "the app-to-flow table");
- }
- String clusterId =
- Separator.decode(Bytes.toString(rowKeyComponents[0]),
- Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
- String appId = appIDKeyConverter.decode(rowKeyComponents[1]);
- return new AppToFlowRowKey(clusterId, appId);
- }
+ String appId = new AppIdKeyConverter().decode(rowKey);
+ return new AppToFlowRowKey(appId);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/44216a46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java
index 301cf99..583ee04 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java
@@ -41,21 +41,32 @@ import java.io.IOException;
* <pre>
* |--------------------------------------|
* | Row | Column Family |
- * | key | info |
+ * | key | mapping |
* |--------------------------------------|
- * | clusterId! | flowName: |
- * | AppId | foo@daily_hive_report |
+ * | appId | flow_name!cluster1: |
+ * | | foo@daily_hive_report |
* | | |
- * | | flowRunId: |
+ * | | flow_run_id!cluster1: |
* | | 1452828720457 |
* | | |
- * | | user_id: |
+ * | | user_id!cluster1: |
* | | admin |
* | | |
+ * | | flow_name!cluster2: |
+ * | | bar@ad_hoc_query |
* | | |
+ * | | flow_run_id!cluster2: |
+ * | | 1452828498752 |
+ * | | |
+ * | | user_id!cluster2: |
+ * | | joe |
* | | |
* |--------------------------------------|
* </pre>
+ *
+ * It is possible (although unlikely) in a multi-cluster environment that there
+ * may be more than one applications for a given app id. Different clusters are
+ * recorded as different sets of columns.
*/
public class AppToFlowTable extends BaseTable<AppToFlowTable> {
/** app_flow prefix. */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/44216a46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
index be55db5..b9815eb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
@@ -316,8 +316,9 @@ public class ColumnHelper<T> {
/**
* @param columnPrefixBytes The byte representation for the column prefix.
* Should not contain {@link Separator#QUALIFIERS}.
- * @param qualifier for the remainder of the column. Any
- * {@link Separator#QUALIFIERS} will be encoded in the qualifier.
+ * @param qualifier for the remainder of the column.
+ * {@link Separator#QUALIFIERS} is permissible in the qualifier
+ * as it is joined only with the column prefix bytes.
* @return fully sanitized column qualifier that is a combination of prefix
* and qualifier. If prefix is null, the result is simply the encoded
* qualifier without any separator.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/44216a46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java
index fcd8320..5bacf66 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java
@@ -17,18 +17,18 @@
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
+import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.webapp.NotFoundException;
-import java.io.IOException;
-
/**
* The base class for reading timeline data from the HBase storage. This class
* provides basic support to validate and augment reader context.
@@ -53,26 +53,38 @@ public abstract class AbstractTimelineStorageReader {
* Looks up flow context from AppToFlow table.
*
* @param appToFlowRowKey to identify Cluster and App Ids.
+ * @param clusterId the cluster id.
* @param hbaseConf HBase configuration.
* @param conn HBase Connection.
* @return flow context information.
* @throws IOException if any problem occurs while fetching flow information.
*/
protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey,
- Configuration hbaseConf, Connection conn) throws IOException {
+ String clusterId, Configuration hbaseConf, Connection conn)
+ throws IOException {
byte[] rowKey = appToFlowRowKey.getRowKey();
Get get = new Get(rowKey);
Result result = appToFlowTable.getResult(hbaseConf, conn, get);
if (result != null && !result.isEmpty()) {
- return new FlowContext(AppToFlowColumn.USER_ID.readResult(result)
- .toString(), AppToFlowColumn.FLOW_ID.readResult(result).toString(),
- ((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result))
- .longValue());
+ Object flowName =
+ AppToFlowColumnPrefix.FLOW_NAME.readResult(result, clusterId);
+ Object flowRunId =
+ AppToFlowColumnPrefix.FLOW_RUN_ID.readResult(result, clusterId);
+ Object userId =
+ AppToFlowColumnPrefix.USER_ID.readResult(result, clusterId);
+ if (flowName == null || userId == null || flowRunId == null) {
+ throw new NotFoundException(
+ "Unable to find the context flow name, and flow run id, "
+ + "and user id for clusterId=" + clusterId
+ + ", appId=" + appToFlowRowKey.getAppId());
+ }
+ return new FlowContext((String)userId, (String)flowName,
+ ((Number)flowRunId).longValue());
} else {
throw new NotFoundException(
- "Unable to find the context flow ID and flow run ID for clusterId="
- + appToFlowRowKey.getClusterId() + ", appId="
- + appToFlowRowKey.getAppId());
+ "Unable to find the context flow name, and flow run id, "
+ + "and user id for clusterId=" + clusterId
+ + ", appId=" + appToFlowRowKey.getAppId());
}
}
@@ -102,9 +114,10 @@ public abstract class AbstractTimelineStorageReader {
|| context.getUserId() == null) {
// Get flow context information from AppToFlow table.
AppToFlowRowKey appToFlowRowKey =
- new AppToFlowRowKey(context.getClusterId(), context.getAppId());
+ new AppToFlowRowKey(context.getAppId());
FlowContext flowContext =
- lookupFlowContext(appToFlowRowKey, hbaseConf, conn);
+ lookupFlowContext(appToFlowRowKey, context.getClusterId(), hbaseConf,
+ conn);
context.setFlowName(flowContext.flowName);
context.setFlowRunId(flowContext.flowRunId);
context.setUserId(flowContext.userId);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/44216a46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
index 7560f33..cbd2273 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
@@ -127,10 +127,8 @@ public class TestRowKeys {
*/
@Test
public void testAppToFlowRowKey() {
- byte[] byteRowKey = new AppToFlowRowKey(CLUSTER,
- APPLICATION_ID).getRowKey();
+ byte[] byteRowKey = new AppToFlowRowKey(APPLICATION_ID).getRowKey();
AppToFlowRowKey rowKey = AppToFlowRowKey.parseRowKey(byteRowKey);
- assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(APPLICATION_ID, rowKey.getAppId());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org