You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by va...@apache.org on 2017/08/22 13:34:48 UTC

[13/51] [abbrv] hadoop git commit: YARN-5378. Accommodate app-id->cluster mapping (Sangjin Lee via Varun Saxena)

YARN-5378. Accommodate app-id->cluster mapping (Sangjin Lee via Varun Saxena)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/889aa68b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/889aa68b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/889aa68b

Branch: refs/heads/YARN-5355
Commit: 889aa68bbd49b2f431670f805a6c25c82acd0181
Parents: b23caaf
Author: Varun Saxena <va...@apache.org>
Authored: Tue Jan 17 20:05:47 2017 +0530
Committer: Varun Saxena <va...@apache.org>
Committed: Tue Aug 22 19:03:02 2017 +0530

----------------------------------------------------------------------
 .../storage/HBaseTimelineWriterImpl.java        |  20 +-
 .../apptoflow/AppToFlowColumnPrefix.java        | 206 +++++++++++++++++++
 .../storage/apptoflow/AppToFlowRowKey.java      | 101 +--------
 .../storage/apptoflow/AppToFlowTable.java       |  21 +-
 .../storage/common/ColumnHelper.java            |   5 +-
 .../reader/AbstractTimelineStorageReader.java   |  39 ++--
 .../storage/common/TestRowKeys.java             |   4 +-
 7 files changed, 271 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/889aa68b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index 82494ef..df33024 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
@@ -172,9 +172,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
         FlowRunRowKey flowRunRowKey =
             new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
         if (event != null) {
-          AppToFlowRowKey appToFlowRowKey =
-              new AppToFlowRowKey(clusterId, appId);
-          onApplicationCreated(flowRunRowKey, appToFlowRowKey, appId, userId,
+          onApplicationCreated(flowRunRowKey, clusterId, appId, userId,
               flowVersion, te, event.getTimestamp());
         }
         // if it's an application entity, store metrics
@@ -193,18 +191,22 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
   }
 
   private void onApplicationCreated(FlowRunRowKey flowRunRowKey,
-      AppToFlowRowKey appToFlowRowKey, String appId, String userId,
-      String flowVersion, TimelineEntity te, long appCreatedTimeStamp)
+      String clusterId, String appId, String userId, String flowVersion,
+      TimelineEntity te, long appCreatedTimeStamp)
       throws IOException {
 
     String flowName = flowRunRowKey.getFlowName();
     Long flowRunId = flowRunRowKey.getFlowRunId();
 
     // store in App to flow table
+    AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey(appId);
     byte[] rowKey = appToFlowRowKey.getRowKey();
-    AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName);
-    AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId);
-    AppToFlowColumn.USER_ID.store(rowKey, appToFlowTable, null, userId);
+    AppToFlowColumnPrefix.FLOW_NAME.store(rowKey, appToFlowTable, clusterId,
+        null, flowName);
+    AppToFlowColumnPrefix.FLOW_RUN_ID.store(rowKey, appToFlowTable, clusterId,
+        null, flowRunId);
+    AppToFlowColumnPrefix.USER_ID.store(rowKey, appToFlowTable, clusterId, null,
+        userId);
 
     // store in flow run table
     storeAppCreatedInFlowRunTable(flowRunRowKey, appId, te);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/889aa68b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnPrefix.java
new file mode 100644
index 0000000..f1e4495
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnPrefix.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+
+/**
+ * Identifies partially qualified columns for the app-to-flow table.
+ */
+public enum AppToFlowColumnPrefix implements ColumnPrefix<AppToFlowTable> {
+
+  /**
+   * The flow name.
+   */
+  FLOW_NAME(AppToFlowColumnFamily.MAPPING, "flow_name"),
+
+  /**
+   * The flow run ID.
+   */
+  FLOW_RUN_ID(AppToFlowColumnFamily.MAPPING, "flow_run_id"),
+
+  /**
+   * The user.
+   */
+  USER_ID(AppToFlowColumnFamily.MAPPING, "user_id");
+
+  private final ColumnHelper<AppToFlowTable> column;
+  private final ColumnFamily<AppToFlowTable> columnFamily;
+  private final String columnPrefix;
+  private final byte[] columnPrefixBytes;
+
+  private AppToFlowColumnPrefix(ColumnFamily<AppToFlowTable> columnFamily,
+      String columnPrefix) {
+    this.columnFamily = columnFamily;
+    this.columnPrefix = columnPrefix;
+    if (columnPrefix == null) {
+      this.columnPrefixBytes = null;
+    } else {
+      // Future-proof by ensuring the right column prefix hygiene.
+      this.columnPrefixBytes =
+          Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
+    }
+    this.column = new ColumnHelper<AppToFlowTable>(columnFamily);
+  }
+
+  @Override
+  public byte[] getColumnPrefixBytes(String qualifierPrefix) {
+    return ColumnHelper.getColumnQualifier(
+        columnPrefixBytes, qualifierPrefix);
+  }
+
+  @Override
+  public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
+    return ColumnHelper.getColumnQualifier(
+        columnPrefixBytes, qualifierPrefix);
+  }
+
+  @Override
+  public byte[] getColumnFamilyBytes() {
+    return columnFamily.getBytes();
+  }
+
+  @Override
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<AppToFlowTable> tableMutator, byte[] qualifier,
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException {
+
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          + tableMutator.getName().getNameAsString());
+    }
+
+    byte[] columnQualifier = getColumnPrefixBytes(qualifier);
+
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+        attributes);
+  }
+
+  @Override
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<AppToFlowTable> tableMutator, String qualifier,
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException {
+
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          + tableMutator.getName().getNameAsString());
+    }
+
+    byte[] columnQualifier = getColumnPrefixBytes(qualifier);
+
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+        attributes);
+  }
+
+  @Override
+  public ValueConverter getValueConverter() {
+    return column.getValueConverter();
+  }
+
+  @Override
+  public Object readResult(Result result, String qualifier) throws IOException {
+    byte[] columnQualifier =
+        ColumnHelper.getColumnQualifier(columnPrefixBytes, qualifier);
+    return column.readResult(result, columnQualifier);
+  }
+
+  @Override
+  public <K> Map<K, Object> readResults(Result result,
+      KeyConverter<K> keyConverter)
+      throws IOException {
+    return column.readResults(result, columnPrefixBytes, keyConverter);
+  }
+
+  @Override
+  public <K, V> NavigableMap<K, NavigableMap<Long, V>>
+      readResultsWithTimestamps(Result result,
+      KeyConverter<K> keyConverter) throws IOException {
+    return column.readResultsWithTimestamps(result, columnPrefixBytes,
+        keyConverter);
+  }
+
+  /**
+   * Retrieve an {@link AppToFlowColumnPrefix} given a name, or null if there
+   * is no match. The following holds true: {@code columnFor(x) == columnFor(y)}
+   * if and only if {@code x.equals(y)} or {@code (x == y == null)}
+   *
+   * @param columnPrefix Name of the column to retrieve
+   * @return the corresponding {@link AppToFlowColumnPrefix} or null
+   */
+  public static final AppToFlowColumnPrefix columnFor(String columnPrefix) {
+
+    // Match column based on value, assume column family matches.
+    for (AppToFlowColumnPrefix afcp : AppToFlowColumnPrefix.values()) {
+      // Find a match based only on name.
+      if (afcp.columnPrefix.equals(columnPrefix)) {
+        return afcp;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+  /**
+   * Retrieve an {@link AppToFlowColumnPrefix} given a name, or null if there
+   * is no match. The following holds true:
+   * {@code columnFor(a,x) == columnFor(b,y)} if and only if
+   * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)}
+   *
+   * @param columnFamily The columnFamily for which to retrieve the column.
+   * @param columnPrefix Name of the column to retrieve
+   * @return the corresponding {@link AppToFlowColumnPrefix} or null if both
+   *         arguments don't match.
+   */
+  public static final AppToFlowColumnPrefix columnFor(
+      AppToFlowColumnFamily columnFamily, String columnPrefix) {
+
+    // TODO: needs unit test to confirm and need to update javadoc to explain
+    // null prefix case.
+
+    for (AppToFlowColumnPrefix afcp : AppToFlowColumnPrefix.values()) {
+      // Find a match based column family and on name.
+      if (afcp.columnFamily.equals(columnFamily)
+          && (((columnPrefix == null) && (afcp.columnPrefix == null)) ||
+          (afcp.columnPrefix.equals(columnPrefix)))) {
+        return afcp;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/889aa68b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
index 8df4407..146c475 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
@@ -17,41 +17,32 @@
  */
 package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
 
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 
 /**
- * Represents a rowkey for the app_flow table.
+ * Represents a row key for the app_flow table, which is the app id.
  */
 public class AppToFlowRowKey {
-  private final String clusterId;
   private final String appId;
-  private final KeyConverter<AppToFlowRowKey> appToFlowRowKeyConverter =
-      new AppToFlowRowKeyConverter();
+  private final KeyConverter<String> appIdKeyConverter =
+      new AppIdKeyConverter();
 
-  public AppToFlowRowKey(String clusterId, String appId) {
-    this.clusterId = clusterId;
+  public AppToFlowRowKey(String appId) {
     this.appId = appId;
   }
 
-  public String getClusterId() {
-    return clusterId;
-  }
-
   public String getAppId() {
     return appId;
   }
 
   /**
-   * Constructs a row key prefix for the app_flow table as follows:
-   * {@code clusterId!AppId}.
+   * Constructs a row key prefix for the app_flow table.
    *
    * @return byte array with the row key
    */
   public  byte[] getRowKey() {
-    return appToFlowRowKeyConverter.encode(this);
+    return appIdKeyConverter.encode(appId);
   }
 
   /**
@@ -61,83 +52,7 @@ public class AppToFlowRowKey {
    * @return an <cite>AppToFlowRowKey</cite> object.
    */
   public static AppToFlowRowKey parseRowKey(byte[] rowKey) {
-    return new AppToFlowRowKeyConverter().decode(rowKey);
-  }
-
-  /**
-   * Encodes and decodes row key for app_flow table. The row key is of the form
-   * clusterId!appId. clusterId is a string and appId is encoded/decoded using
-   * {@link AppIdKeyConverter}.
-   * <p>
-   */
-  final private static class AppToFlowRowKeyConverter implements
-      KeyConverter<AppToFlowRowKey> {
-
-    private final KeyConverter<String> appIDKeyConverter =
-        new AppIdKeyConverter();
-
-    /**
-     * Intended for use in AppToFlowRowKey only.
-     */
-    private AppToFlowRowKeyConverter() {
-    }
-
-
-    /**
-     * App to flow row key is of the form clusterId!appId with the 2 segments
-     * separated by !. The sizes below indicate sizes of both of these segments
-     * in sequence. clusterId is a string. appId is represented as 12 bytes w.
-     * cluster Timestamp part of appid taking 8 bytes(long) and seq id taking 4
-     * bytes(int). Strings are variable in size (i.e. end whenever separator is
-     * encountered). This is used while decoding and helps in determining where
-     * to split.
-     */
-    private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
-        Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT };
-
-    /*
-     * (non-Javadoc)
-     *
-     * Encodes AppToFlowRowKey object into a byte array with each
-     * component/field in AppToFlowRowKey separated by Separator#QUALIFIERS.
-     * This leads to an app to flow table row key of the form clusterId!appId
-     *
-     * @see
-     * org.apache.hadoop.yarn.server.timelineservice.storage.common
-     * .KeyConverter#encode(java.lang.Object)
-     */
-    @Override
-    public byte[] encode(AppToFlowRowKey rowKey) {
-      byte[] first =
-          Separator.encode(rowKey.getClusterId(), Separator.SPACE,
-              Separator.TAB, Separator.QUALIFIERS);
-      byte[] second = appIDKeyConverter.encode(rowKey.getAppId());
-      return Separator.QUALIFIERS.join(first, second);
-    }
-
-    /*
-     * (non-Javadoc)
-     *
-     * Decodes an app to flow row key of the form clusterId!appId represented
-     * in byte format and converts it into an AppToFlowRowKey object.
-     *
-     * @see
-     * org.apache.hadoop.yarn.server.timelineservice.storage.common
-     * .KeyConverter#decode(byte[])
-     */
-    @Override
-    public AppToFlowRowKey decode(byte[] rowKey) {
-      byte[][] rowKeyComponents =
-          Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
-      if (rowKeyComponents.length != 2) {
-        throw new IllegalArgumentException("the row key is not valid for "
-            + "the app-to-flow table");
-      }
-      String clusterId =
-          Separator.decode(Bytes.toString(rowKeyComponents[0]),
-              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
-      String appId = appIDKeyConverter.decode(rowKeyComponents[1]);
-      return new AppToFlowRowKey(clusterId, appId);
-    }
+    String appId = new AppIdKeyConverter().decode(rowKey);
+    return new AppToFlowRowKey(appId);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/889aa68b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java
index 40d95a4..04da5c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java
@@ -41,21 +41,32 @@ import java.io.IOException;
  * <pre>
  * |--------------------------------------|
  * |  Row       | Column Family           |
- * |  key       | info                    |
+ * |  key       | mapping                 |
  * |--------------------------------------|
- * | clusterId! | flowName:               |
- * | AppId      | foo@daily_hive_report   |
+ * | appId      | flow_name!cluster1:     |
+ * |            | foo@daily_hive_report   |
  * |            |                         |
- * |            | flowRunId:              |
+ * |            | flow_run_id!cluster1:   |
  * |            | 1452828720457           |
  * |            |                         |
- * |            | user_id:                |
+ * |            | user_id!cluster1:       |
  * |            | admin                   |
  * |            |                         |
+ * |            | flow_name!cluster2:     |
+ * |            | bar@ad_hoc_query        |
  * |            |                         |
+ * |            | flow_run_id!cluster2:   |
+ * |            | 1452828498752           |
+ * |            |                         |
+ * |            | user_id!cluster2:       |
+ * |            | joe                     |
  * |            |                         |
  * |--------------------------------------|
  * </pre>
+ *
+ * It is possible (although unlikely) in a multi-cluster environment that there
+ * may be more than one applications for a given app id. Different clusters are
+ * recorded as different sets of columns.
  */
 public class AppToFlowTable extends BaseTable<AppToFlowTable> {
   /** app_flow prefix. */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/889aa68b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
index a9c2148..162f973 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
@@ -318,8 +318,9 @@ public class ColumnHelper<T> {
   /**
    * @param columnPrefixBytes The byte representation for the column prefix.
    *          Should not contain {@link Separator#QUALIFIERS}.
-   * @param qualifier for the remainder of the column. Any
-   *          {@link Separator#QUALIFIERS} will be encoded in the qualifier.
+   * @param qualifier for the remainder of the column.
+   *          {@link Separator#QUALIFIERS} is permissible in the qualifier
+   *          as it is joined only with the column prefix bytes.
    * @return fully sanitized column qualifier that is a combination of prefix
    *         and qualifier. If prefix is null, the result is simply the encoded
    *         qualifier without any separator.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/889aa68b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java
index fcd8320..5bacf66 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java
@@ -17,18 +17,18 @@
  */
 package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
 
+import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
 
-import java.io.IOException;
-
 /**
  * The base class for reading timeline data from the HBase storage. This class
  * provides basic support to validate and augment reader context.
@@ -53,26 +53,38 @@ public abstract class AbstractTimelineStorageReader {
    * Looks up flow context from AppToFlow table.
    *
    * @param appToFlowRowKey to identify Cluster and App Ids.
+   * @param clusterId the cluster id.
    * @param hbaseConf HBase configuration.
    * @param conn HBase Connection.
    * @return flow context information.
    * @throws IOException if any problem occurs while fetching flow information.
    */
   protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey,
-      Configuration hbaseConf, Connection conn) throws IOException {
+      String clusterId, Configuration hbaseConf, Connection conn)
+      throws IOException {
     byte[] rowKey = appToFlowRowKey.getRowKey();
     Get get = new Get(rowKey);
     Result result = appToFlowTable.getResult(hbaseConf, conn, get);
     if (result != null && !result.isEmpty()) {
-      return new FlowContext(AppToFlowColumn.USER_ID.readResult(result)
-          .toString(), AppToFlowColumn.FLOW_ID.readResult(result).toString(),
-          ((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result))
-          .longValue());
+      Object flowName =
+          AppToFlowColumnPrefix.FLOW_NAME.readResult(result, clusterId);
+      Object flowRunId =
+          AppToFlowColumnPrefix.FLOW_RUN_ID.readResult(result, clusterId);
+      Object userId =
+          AppToFlowColumnPrefix.USER_ID.readResult(result, clusterId);
+      if (flowName == null || userId == null || flowRunId == null) {
+        throw new NotFoundException(
+            "Unable to find the context flow name, and flow run id, "
+            + "and user id for clusterId=" + clusterId
+            + ", appId=" + appToFlowRowKey.getAppId());
+      }
+      return new FlowContext((String)userId, (String)flowName,
+          ((Number)flowRunId).longValue());
     } else {
       throw new NotFoundException(
-          "Unable to find the context flow ID and flow run ID for clusterId="
-              + appToFlowRowKey.getClusterId() + ", appId="
-              + appToFlowRowKey.getAppId());
+          "Unable to find the context flow name, and flow run id, "
+          + "and user id for clusterId=" + clusterId
+          + ", appId=" + appToFlowRowKey.getAppId());
     }
   }
 
@@ -102,9 +114,10 @@ public abstract class AbstractTimelineStorageReader {
         || context.getUserId() == null) {
       // Get flow context information from AppToFlow table.
       AppToFlowRowKey appToFlowRowKey =
-          new AppToFlowRowKey(context.getClusterId(), context.getAppId());
+          new AppToFlowRowKey(context.getAppId());
       FlowContext flowContext =
-          lookupFlowContext(appToFlowRowKey, hbaseConf, conn);
+          lookupFlowContext(appToFlowRowKey, context.getClusterId(), hbaseConf,
+          conn);
       context.setFlowName(flowContext.flowName);
       context.setFlowRunId(flowContext.flowRunId);
       context.setUserId(flowContext.userId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/889aa68b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
index 7560f33..cbd2273 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
@@ -127,10 +127,8 @@ public class TestRowKeys {
    */
   @Test
   public void testAppToFlowRowKey() {
-    byte[] byteRowKey = new AppToFlowRowKey(CLUSTER,
-        APPLICATION_ID).getRowKey();
+    byte[] byteRowKey = new AppToFlowRowKey(APPLICATION_ID).getRowKey();
     AppToFlowRowKey rowKey = AppToFlowRowKey.parseRowKey(byteRowKey);
-    assertEquals(CLUSTER, rowKey.getClusterId());
     assertEquals(APPLICATION_ID, rowKey.getAppId());
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org