You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2016/01/20 10:13:48 UTC

[31/50] [abbrv] hadoop git commit: YARN-4053. Change the way metric values are stored in HBase Storage (Varun Saxena via sjlee)

YARN-4053. Change the way metric values are stored in HBase Storage (Varun Saxena via sjlee)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e6f57ecc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e6f57ecc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e6f57ecc

Branch: refs/heads/feature-YARN-2928
Commit: e6f57ecc03d63eb7183bfc76cfafbc3f5023b8a8
Parents: af14edb
Author: Sangjin Lee <sj...@apache.org>
Authored: Fri Nov 20 10:03:02 2015 -0800
Committer: Li Lu <gt...@apache.org>
Committed: Tue Jan 19 17:59:39 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../storage/FlowRunEntityReader.java            |   4 +-
 .../application/ApplicationColumnPrefix.java    |  23 +++-
 .../storage/common/ColumnHelper.java            |  27 ++++-
 .../storage/common/GenericConverter.java        |  48 ++++++++
 .../storage/common/LongConverter.java           |  78 +++++++++++++
 .../storage/common/NumericValueConverter.java   |  38 +++++++
 .../storage/common/TimelineStorageUtils.java    |  11 ++
 .../storage/common/ValueConverter.java          |  45 ++++++++
 .../storage/common/package-info.java            |  28 +++++
 .../storage/entity/EntityColumn.java            |   1 -
 .../storage/entity/EntityColumnPrefix.java      |  25 ++++-
 .../storage/flow/FlowRunColumn.java             |  24 +++-
 .../storage/flow/FlowRunColumnPrefix.java       |  21 +++-
 .../storage/flow/FlowScanner.java               | 110 ++++++++++++++-----
 .../storage/TestHBaseTimelineStorage.java       |  81 +++++++++++++-
 .../storage/flow/TestHBaseStorageFlowRun.java   |  15 +--
 17 files changed, 524 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6f57ecc/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index fdd0d83..74f58f6 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -140,6 +140,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
 
     YARN-4221. Store user in app to flow table (Varun Saxena via sjlee)
 
+    YARN-4053. Change the way metric values are stored in HBase Storage (Varun
+    Saxena via sjlee)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6f57ecc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
index c4b4e91..ebf2d27 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
@@ -137,7 +137,7 @@ class FlowRunEntityReader extends TimelineEntityReader {
     }
 
     // read the start time
-    Number startTime = (Number)FlowRunColumn.MIN_START_TIME.readResult(result);
+    Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result);
     if (startTime != null) {
       flowRun.setStartTime(startTime.longValue());
     }
@@ -147,7 +147,7 @@ class FlowRunEntityReader extends TimelineEntityReader {
     }
 
     // read the end time if available
-    Number endTime = (Number)FlowRunColumn.MAX_END_TIME.readResult(result);
+    Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result);
     if (endTime != null) {
       flowRun.setMaxEndTime(endTime.longValue());
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6f57ecc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
index d7b5773..b06f5c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
@@ -26,8 +26,11 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 
 /**
@@ -63,7 +66,8 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
   /**
    * Metrics are stored with the metric name as the column name.
    */
-  METRIC(ApplicationColumnFamily.METRICS, null);
+  METRIC(ApplicationColumnFamily.METRICS, null,
+      LongConverter.getInstance());
 
   private final ColumnHelper<ApplicationTable> column;
   private final ColumnFamily<ApplicationTable> columnFamily;
@@ -83,7 +87,20 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
    */
   private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily,
       String columnPrefix) {
-    column = new ColumnHelper<ApplicationTable>(columnFamily);
+    this(columnFamily, columnPrefix, GenericConverter.getInstance());
+  }
+
+  /**
+   * Private constructor, meant to be used by the enum definition.
+   *
+   * @param columnFamily that this column is stored in.
+   * @param columnPrefix for this column.
+   * @param converter used to encode/decode values to be stored in HBase for
+   * this column prefix.
+   */
+  private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily,
+      String columnPrefix, ValueConverter converter) {
+    column = new ColumnHelper<ApplicationTable>(columnFamily, converter);
     this.columnFamily = columnFamily;
     this.columnPrefix = columnPrefix;
     if (columnPrefix == null) {
@@ -127,7 +144,7 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
 
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
         attributes);
- }
+  }
 
   /*
    * (non-Javadoc)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6f57ecc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
index 3a2e088..1e63ce5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 /**
@@ -50,9 +49,20 @@ public class ColumnHelper<T> {
    */
   private final byte[] columnFamilyBytes;
 
+  private final ValueConverter converter;
+
   public ColumnHelper(ColumnFamily<T> columnFamily) {
+    this(columnFamily, GenericConverter.getInstance());
+  }
+
+  public ColumnHelper(ColumnFamily<T> columnFamily, ValueConverter converter) {
     this.columnFamily = columnFamily;
     columnFamilyBytes = columnFamily.getBytes();
+    if (converter == null) {
+      this.converter = GenericConverter.getInstance();
+    } else {
+      this.converter = converter;
+    }
   }
 
   /**
@@ -83,7 +93,7 @@ public class ColumnHelper<T> {
     Put p = new Put(rowKey);
     timestamp = getPutTimestamp(timestamp, attributes);
     p.addColumn(columnFamilyBytes, columnQualifier, timestamp,
-        GenericObjectMapper.write(inputValue));
+        converter.encodeValue(inputValue));
     if ((attributes != null) && (attributes.length > 0)) {
       for (Attribute attribute : attributes) {
         p.setAttribute(attribute.getName(), attribute.getValue());
@@ -148,7 +158,7 @@ public class ColumnHelper<T> {
     // ByteBuffer to avoid copy, but GenericObjectMapper doesn't seem to like
     // that.
     byte[] value = result.getValue(columnFamilyBytes, columnQualifierBytes);
-    return GenericObjectMapper.read(value);
+    return converter.decodeValue(value);
   }
 
   /**
@@ -206,7 +216,7 @@ public class ColumnHelper<T> {
             if (cells != null) {
               for (Entry<Long, byte[]> cell : cells.entrySet()) {
                 V value =
-                    (V) GenericObjectMapper.read(cell.getValue());
+                    (V) converter.decodeValue(cell.getValue());
                 cellResults.put(
                     TimestampGenerator.getTruncatedTimestamp(cell.getKey()),
                     value);
@@ -266,7 +276,7 @@ public class ColumnHelper<T> {
 
           // If this column has the prefix we want
           if (columnName != null) {
-            Object value = GenericObjectMapper.read(entry.getValue());
+            Object value = converter.decodeValue(entry.getValue());
             results.put(columnName, value);
           }
         }
@@ -313,7 +323,7 @@ public class ColumnHelper<T> {
               // This is the prefix that we want
               byte[][] columnQualifierParts =
                   Separator.VALUES.split(columnNameParts[1]);
-              Object value = GenericObjectMapper.read(entry.getValue());
+              Object value = converter.decodeValue(entry.getValue());
               // we return the columnQualifier in parts since we don't know
               // which part is of which data type
               results.put(columnQualifierParts, value);
@@ -371,6 +381,11 @@ public class ColumnHelper<T> {
         Separator.QUALIFIERS.join(columnPrefixBytes, Bytes.toBytes(qualifier));
     return columnQualifier;
   }
+
+  public ValueConverter getValueConverter() {
+    return converter;
+  }
+
   /**
    * @param columnPrefixBytes The byte representation for the column prefix.
    *          Should not contain {@link Separator#QUALIFIERS}.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6f57ecc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/GenericConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/GenericConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/GenericConverter.java
new file mode 100644
index 0000000..c34bfcb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/GenericConverter.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+
+/**
+ * Uses GenericObjectMapper to encode objects as bytes and decode bytes as
+ * objects.
+ */
+public final class GenericConverter implements ValueConverter {
+  private static final GenericConverter INSTANCE = new GenericConverter();
+
+  private GenericConverter() {
+  }
+
+  public static GenericConverter getInstance() {
+    return INSTANCE;
+  }
+
+  @Override
+  public byte[] encodeValue(Object value) throws IOException {
+    return GenericObjectMapper.write(value);
+  }
+
+  @Override
+  public Object decodeValue(byte[] bytes) throws IOException {
+    return GenericObjectMapper.read(bytes);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6f57ecc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java
new file mode 100644
index 0000000..cdb8619
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Encodes a value by interpreting it as a Long and converting it to bytes and
+ * decodes a set of bytes as a Long.
+ */
+public final class LongConverter implements NumericValueConverter {
+  private static final LongConverter INSTANCE = new LongConverter();
+
+  private LongConverter() {
+  }
+
+  public static LongConverter getInstance() {
+    return INSTANCE;
+  }
+
+  @Override
+  public byte[] encodeValue(Object value) throws IOException {
+    if (!TimelineStorageUtils.isIntegralValue(value)) {
+      throw new IOException("Expected integral value");
+    }
+    return Bytes.toBytes(((Number)value).longValue());
+  }
+
+  @Override
+  public Object decodeValue(byte[] bytes) throws IOException {
+    if (bytes == null) {
+      return null;
+    }
+    return Bytes.toLong(bytes);
+  }
+
+  /**
+   * Compares two numbers as longs. If either number is null, it will be taken
+   * as 0.
+   * @param num1
+   * @param num2
+   * @return -1 if num1 is less than num2, 0 if num1 is equal to num2 and 1 if
+   * num1 is greater than num2.
+   */
+  @Override
+  public int compare(Number num1, Number num2) {
+    return Long.compare((num1 == null) ? 0L : num1.longValue(),
+        (num2 == null) ? 0L : num2.longValue());
+  }
+
+  @Override
+  public Number add(Number num1, Number num2, Number...numbers) {
+    long sum = ((num1 == null) ? 0L : num1.longValue()) +
+        ((num2 == null) ? 0L : num2.longValue());
+    for (Number num : numbers) {
+      sum = sum + ((num == null) ? 0L : num.longValue());
+    }
+    return sum;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6f57ecc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/NumericValueConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/NumericValueConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/NumericValueConverter.java
new file mode 100644
index 0000000..70964cd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/NumericValueConverter.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.util.Comparator;
+
+/**
+ * Extends ValueConverter interface for numeric converters to support numerical
+ * operations such as comparison, addition, etc.
+ */
+public interface NumericValueConverter extends ValueConverter,
+    Comparator<Number> {
+  /**
+   * Adds two or more numbers. If either of the numbers are null, it is taken as
+   * 0.
+   * @param num1
+   * @param num2
+   * @param numbers
+   * @return result after adding up the numbers.
+   */
+  Number add(Number num1, Number num2, Number...numbers);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6f57ecc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
index c1aaf19..e30f699 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
@@ -472,4 +472,15 @@ public class TimelineStorageUtils {
     }
     return true;
   }
+
+  /**
+   * Checks if passed object is of integral type(Short/Integer/Long).
+   * @param obj
+   * @return true if object passed is of type Short or Integer or Long, false
+   * otherwise.
+   */
+  public static boolean isIntegralValue(Object obj) {
+    return (obj instanceof Short) || (obj instanceof Integer) ||
+        (obj instanceof Long);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6f57ecc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ValueConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ValueConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ValueConverter.java
new file mode 100644
index 0000000..2388ba5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ValueConverter.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+
+/**
+ * Converter used to encode/decode value associated with a column prefix or a
+ * column.
+ */
+public interface ValueConverter {
+
+  /**
+   * Encode an object as a byte array depending on the converter implementation.
+   * @param value
+   * @return a byte array
+   * @throws IOException
+   */
+  byte[] encodeValue(Object value) throws IOException;
+
+  /**
+   * Decode a byte array and convert it into an object depending on the
+   * converter implementation.
+   * @param bytes
+   * @return an object
+   * @throws IOException
+   */
+  Object decodeValue(byte[] bytes) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6f57ecc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
new file mode 100644
index 0000000..0df5b8a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.storage.common contains
+ * a set of utility classes used across backend storage reader and writer.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6f57ecc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
index 8ae19b8..e12b6e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
 
 import java.io.IOException;
-import java.util.Map;
 
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6f57ecc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
index 0d4e5a8..abede9c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
@@ -26,8 +26,11 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 
 /**
@@ -63,7 +66,8 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
   /**
    * Metrics are stored with the metric name as the column name.
    */
-  METRIC(EntityColumnFamily.METRICS, null);
+  METRIC(EntityColumnFamily.METRICS, null,
+      LongConverter.getInstance());
 
   private final ColumnHelper<EntityTable> column;
   private final ColumnFamily<EntityTable> columnFamily;
@@ -83,7 +87,20 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
    */
   EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
       String columnPrefix) {
-    column = new ColumnHelper<EntityTable>(columnFamily);
+    this(columnFamily, columnPrefix, GenericConverter.getInstance());
+  }
+
+  /**
+   * Private constructor, meant to be used by the enum definition.
+   *
+   * @param columnFamily that this column is stored in.
+   * @param columnPrefix for this column.
+   * @param converter used to encode/decode values to be stored in HBase for
+   * this column prefix.
+   */
+  EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
+      String columnPrefix, ValueConverter converter) {
+    column = new ColumnHelper<EntityTable>(columnFamily, converter);
     this.columnFamily = columnFamily;
     this.columnPrefix = columnPrefix;
     if (columnPrefix == null) {
@@ -128,7 +145,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
 
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
         attributes);
- }
+  }
 
   /*
    * (non-Javadoc)
@@ -155,7 +172,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
 
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
         attributes);
- }
+  }
 
   /*
    * (non-Javadoc)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6f57ecc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
index 5079cc0..148a37f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
@@ -24,9 +24,12 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
 
 /**
  * Identifies fully qualified columns for the {@link FlowRunTable}.
@@ -38,14 +41,14 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
    * application start times.
    */
   MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time",
-      AggregationOperation.MIN),
+      AggregationOperation.MIN, LongConverter.getInstance()),
 
   /**
    * When the flow ended. This is the maximum of currently known application end
    * times.
    */
   MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time",
-      AggregationOperation.MAX),
+      AggregationOperation.MAX, LongConverter.getInstance()),
 
   /**
    * The version of the flow that this flow belongs to.
@@ -60,13 +63,20 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
 
   private FlowRunColumn(ColumnFamily<FlowRunTable> columnFamily,
       String columnQualifier, AggregationOperation aggOp) {
+    this(columnFamily, columnQualifier, aggOp,
+        GenericConverter.getInstance());
+  }
+
+  private FlowRunColumn(ColumnFamily<FlowRunTable> columnFamily,
+      String columnQualifier, AggregationOperation aggOp,
+      ValueConverter converter) {
     this.columnFamily = columnFamily;
     this.columnQualifier = columnQualifier;
     this.aggOp = aggOp;
     // Future-proof by ensuring the right column prefix hygiene.
     this.columnQualifierBytes = Bytes.toBytes(Separator.SPACE
         .encode(columnQualifier));
-    this.column = new ColumnHelper<FlowRunTable>(columnFamily);
+    this.column = new ColumnHelper<FlowRunTable>(columnFamily, converter);
   }
 
   /**
@@ -80,6 +90,10 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
     return columnQualifierBytes.clone();
   }
 
+  public byte[] getColumnFamilyBytes() {
+    return columnFamily.getBytes();
+  }
+
   public AggregationOperation getAggregationOperation() {
     return aggOp;
   }
@@ -130,6 +144,10 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
     return null;
   }
 
+  public ValueConverter getValueConverter() {
+    return column.getValueConverter();
+  }
+
   /**
    * Retrieve an {@link FlowRunColumn} given a name, or null if there is no
    * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6f57ecc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
index b090bba..eb055fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
 
 /**
  * Identifies partially qualified columns for the {@link FlowRunTable}.
@@ -38,7 +40,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
   /**
    * To store flow run info values.
    */
-  METRIC(FlowRunColumnFamily.INFO, "m", AggregationOperation.SUM);
+  METRIC(FlowRunColumnFamily.INFO, "m", AggregationOperation.SUM,
+      LongConverter.getInstance());
 
   private final ColumnHelper<FlowRunTable> column;
   private final ColumnFamily<FlowRunTable> columnFamily;
@@ -61,8 +64,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
    *          for this column.
    */
   private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily,
-      String columnPrefix, AggregationOperation fra) {
-    column = new ColumnHelper<FlowRunTable>(columnFamily);
+      String columnPrefix, AggregationOperation fra, ValueConverter converter) {
+    column = new ColumnHelper<FlowRunTable>(columnFamily, converter);
     this.columnFamily = columnFamily;
     this.columnPrefix = columnPrefix;
     if (columnPrefix == null) {
@@ -86,6 +89,14 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
     return columnPrefixBytes.clone();
   }
 
+  public byte[] getColumnPrefixBytes(String qualifier) {
+    return ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+  }
+
+  public byte[] getColumnFamilyBytes() {
+    return columnFamily.getBytes();
+  }
+
   public AggregationOperation getAttribute() {
     return aggOp;
   }
@@ -205,6 +216,10 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
     return null;
   }
 
+  public ValueConverter getValueConverter() {
+    return column.getValueConverter();
+  }
+
   /**
    * Retrieve an {@link FlowRunColumnPrefix} given a name, or null if there is
    * no match. The following holds true:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6f57ecc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index a537891..d541df0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -39,8 +39,10 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
-import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
 
 /**
  * Invoked via the coprocessor when a Get or a Scan is issued for flow run
@@ -114,6 +116,45 @@ class FlowScanner implements RegionScanner, Closeable {
   }
 
   /**
+   * Get value converter associated with a column or a column prefix. If nothing
+   * matches, generic converter is returned.
+   * @param colQualifierBytes
+   * @return value converter implementation.
+   */
+  private static ValueConverter getValueConverter(byte[] colQualifierBytes) {
+    // Iterate over all the column prefixes for flow run table and get the
+    // appropriate converter for the column qualifier passed if prefix matches.
+    for (FlowRunColumnPrefix colPrefix : FlowRunColumnPrefix.values()) {
+      byte[] colPrefixBytes = colPrefix.getColumnPrefixBytes("");
+      if (Bytes.compareTo(colPrefixBytes, 0, colPrefixBytes.length,
+          colQualifierBytes, 0, colPrefixBytes.length) == 0) {
+        return colPrefix.getValueConverter();
+      }
+    }
+    // Iterate over all the columns for flow run table and get the
+    // appropriate converter for the column qualifier passed if match occurs.
+    for (FlowRunColumn column : FlowRunColumn.values()) {
+      if (Bytes.compareTo(
+          column.getColumnQualifierBytes(), colQualifierBytes) == 0) {
+        return column.getValueConverter();
+      }
+    }
+    // Return generic converter if nothing matches.
+    return GenericConverter.getInstance();
+  }
+
+  /**
+   * Checks if the converter is a numeric converter or not. For a converter to
+   * be numeric, it must implement {@link NumericValueConverter} interface.
+   * @param converter
+   * @return true, if converter is of type NumericValueConverter, false
+   * otherwise.
+   */
+  private static boolean isNumericConverter(ValueConverter converter) {
+    return (converter instanceof NumericValueConverter);
+  }
+
+  /**
    * This method loops through the cells in a given row of the
    * {@link FlowRunTable}. It looks at the tags of each cell to figure out how
    * to process the contents. It then calculates the sum or min or max for each
@@ -141,20 +182,32 @@ class FlowScanner implements RegionScanner, Closeable {
     SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
     Set<String> alreadySeenAggDim = new HashSet<>();
     int addedCnt = 0;
+    ValueConverter converter = null;
     while (((cell = peekAtNextCell(limit)) != null)
         && (limit <= 0 || addedCnt < limit)) {
       byte[] newColumnQualifier = CellUtil.cloneQualifier(cell);
       if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) {
-        addedCnt += emitCells(cells, currentColumnCells, currentAggOp);
+        if (converter != null && isNumericConverter(converter)) {
+          addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
+              (NumericValueConverter)converter);
+        }
         resetState(currentColumnCells, alreadySeenAggDim);
         currentColumnQualifier = newColumnQualifier;
         currentAggOp = getCurrentAggOp(cell);
+        converter = getValueConverter(newColumnQualifier);
       }
-      collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim);
+      // No operation needs to be performed on non numeric converters.
+      if (!isNumericConverter(converter)) {
+        nextCell(limit);
+        continue;
+      }
+      collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim,
+          (NumericValueConverter)converter);
       nextCell(limit);
     }
     if (!currentColumnCells.isEmpty()) {
-      emitCells(cells, currentColumnCells, currentAggOp);
+      emitCells(cells, currentColumnCells, currentAggOp,
+          (NumericValueConverter)converter);
     }
     return hasMore();
   }
@@ -183,7 +236,8 @@ class FlowScanner implements RegionScanner, Closeable {
 
   private void collectCells(SortedSet<Cell> currentColumnCells,
       AggregationOperation currentAggOp, Cell cell,
-      Set<String> alreadySeenAggDim) throws IOException {
+      Set<String> alreadySeenAggDim, NumericValueConverter converter)
+      throws IOException {
     if (currentAggOp == null) {
       // not a min/max/metric cell, so just return it as is
       currentColumnCells.add(cell);
@@ -197,7 +251,8 @@ class FlowScanner implements RegionScanner, Closeable {
         currentColumnCells.add(cell);
       } else {
         Cell currentMinCell = currentColumnCells.first();
-        Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp);
+        Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp,
+            converter);
         if (!currentMinCell.equals(newMinCell)) {
           currentColumnCells.remove(currentMinCell);
           currentColumnCells.add(newMinCell);
@@ -209,7 +264,8 @@ class FlowScanner implements RegionScanner, Closeable {
         currentColumnCells.add(cell);
       } else {
         Cell currentMaxCell = currentColumnCells.first();
-        Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp);
+        Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp,
+            converter);
         if (!currentMaxCell.equals(newMaxCell)) {
           currentColumnCells.remove(currentMaxCell);
           currentColumnCells.add(newMaxCell);
@@ -245,7 +301,8 @@ class FlowScanner implements RegionScanner, Closeable {
    * parameter.
    */
   private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells,
-      AggregationOperation currentAggOp) throws IOException {
+      AggregationOperation currentAggOp, NumericValueConverter converter)
+      throws IOException {
     if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
       return 0;
     }
@@ -261,7 +318,7 @@ class FlowScanner implements RegionScanner, Closeable {
       return currentColumnCells.size();
     case SUM:
     case SUM_FINAL:
-      Cell sumCell = processSummation(currentColumnCells);
+      Cell sumCell = processSummation(currentColumnCells, converter);
       cells.add(sumCell);
       return 1;
     default:
@@ -276,24 +333,24 @@ class FlowScanner implements RegionScanner, Closeable {
    * sum of a metric for a flow run is the summation at the point of the last
    * metric update in that flow till that time.
    */
-  private Cell processSummation(SortedSet<Cell> currentColumnCells)
-      throws IOException {
+  private Cell processSummation(SortedSet<Cell> currentColumnCells,
+      NumericValueConverter converter) throws IOException {
     Number sum = 0;
     Number currentValue = 0;
     long ts = 0L;
-    long mostCurrentTimestamp = 0l;
+    long mostCurrentTimestamp = 0L;
     Cell mostRecentCell = null;
     for (Cell cell : currentColumnCells) {
-      currentValue = (Number) GenericObjectMapper.read(CellUtil
-          .cloneValue(cell));
+      currentValue = (Number) converter.decodeValue(CellUtil.cloneValue(cell));
       ts = cell.getTimestamp();
       if (mostCurrentTimestamp < ts) {
         mostCurrentTimestamp = ts;
         mostRecentCell = cell;
       }
-      sum = sum.longValue() + currentValue.longValue();
+      sum = converter.add(sum, currentValue);
     }
-    Cell sumCell = createNewCell(mostRecentCell, sum);
+    byte[] sumBytes = converter.encodeValue(sum);
+    Cell sumCell = createNewCell(mostRecentCell, sumBytes);
     return sumCell;
   }
 
@@ -308,18 +365,20 @@ class FlowScanner implements RegionScanner, Closeable {
    * @throws IOException
    */
   private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell,
-      AggregationOperation currentAggOp) throws IOException {
+      AggregationOperation currentAggOp, NumericValueConverter converter)
+      throws IOException {
     if (previouslyChosenCell == null) {
       return currentCell;
     }
     try {
-      long previouslyChosenCellValue = ((Number) GenericObjectMapper
-          .read(CellUtil.cloneValue(previouslyChosenCell))).longValue();
-      long currentCellValue = ((Number) GenericObjectMapper.read(CellUtil
-          .cloneValue(currentCell))).longValue();
+      Number previouslyChosenCellValue = (Number)converter.decodeValue(
+          CellUtil.cloneValue(previouslyChosenCell));
+      Number currentCellValue = (Number) converter.decodeValue(CellUtil
+          .cloneValue(currentCell));
       switch (currentAggOp) {
       case MIN:
-        if (currentCellValue < previouslyChosenCellValue) {
+        if (converter.compare(
+            currentCellValue, previouslyChosenCellValue) < 0) {
           // new value is minimum, hence return this cell
           return currentCell;
         } else {
@@ -327,7 +386,8 @@ class FlowScanner implements RegionScanner, Closeable {
           return previouslyChosenCell;
         }
       case MAX:
-        if (currentCellValue > previouslyChosenCellValue) {
+        if (converter.compare(
+            currentCellValue, previouslyChosenCellValue) > 0) {
           // new value is max, hence return this cell
           return currentCell;
         } else {
@@ -343,8 +403,8 @@ class FlowScanner implements RegionScanner, Closeable {
     }
   }
 
-  private Cell createNewCell(Cell origCell, Number number) throws IOException {
-    byte[] newValue = GenericObjectMapper.write(number);
+  private Cell createNewCell(Cell origCell, byte[] newValue)
+      throws IOException {
     return CellUtil.createCell(CellUtil.cloneRow(origCell),
         CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
         origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6f57ecc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
index 701615e..30ead40 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -90,6 +91,15 @@ public class TestHBaseTimelineStorage {
     TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
   }
 
+  private static void matchMetrics(Map<Long, Number> m1, Map<Long, Number> m2) {
+    assertEquals(m1.size(), m2.size());
+    for (Map.Entry<Long, Number> entry : m2.entrySet()) {
+      Number val = m1.get(entry.getKey());
+      assertNotNull(val);
+      assertEquals(val.longValue(), entry.getValue().longValue());
+    }
+  }
+
   @Test
   public void testWriteApplicationToHBase() throws Exception {
     TimelineEntities te = new TimelineEntities();
@@ -243,7 +253,7 @@ public class TestHBaseTimelineStorage {
           ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result);
 
       NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
-      assertEquals(metricValues, metricMap);
+      matchMetrics(metricValues, metricMap);
 
       // read the timeline entity using the reader this time
       TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appId,
@@ -273,7 +283,7 @@ public class TestHBaseTimelineStorage {
       assertEquals(metrics, metrics2);
       for (TimelineMetric metric2 : metrics2) {
         Map<Long, Number> metricValues2 = metric2.getValues();
-        assertEquals(metricValues, metricValues2);
+        matchMetrics(metricValues, metricValues2);
       }
     } finally {
       if (hbi != null) {
@@ -451,7 +461,7 @@ public class TestHBaseTimelineStorage {
               EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
 
           NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
-          assertEquals(metricValues, metricMap);
+          matchMetrics(metricValues, metricMap);
         }
       }
       assertEquals(1, rowCount);
@@ -488,7 +498,7 @@ public class TestHBaseTimelineStorage {
       assertEquals(metrics, metrics2);
       for (TimelineMetric metric2 : metrics2) {
         Map<Long, Number> metricValues2 = metric2.getValues();
-        assertEquals(metricValues, metricValues2);
+        matchMetrics(metricValues, metricValues2);
       }
     } finally {
       if (hbi != null) {
@@ -743,6 +753,69 @@ public class TestHBaseTimelineStorage {
     }
   }
 
+  @Test
+  public void testNonIntegralMetricValues() throws IOException {
+    TimelineEntities teApp = new TimelineEntities();
+    ApplicationEntity entityApp = new ApplicationEntity();
+    String appId = "application_1000178881110_2002";
+    entityApp.setId(appId);
+    entityApp.setCreatedTime(1425016501000L);
+    entityApp.setModifiedTime(1425026901000L);
+    // add metrics with floating point values
+    Set<TimelineMetric> metricsApp = new HashSet<>();
+    TimelineMetric mApp = new TimelineMetric();
+    mApp.setId("MAP_SLOT_MILLIS");
+    Map<Long, Number> metricAppValues = new HashMap<Long, Number>();
+    long ts = System.currentTimeMillis();
+    metricAppValues.put(ts - 20, 10.5);
+    metricAppValues.put(ts - 10, 20.5);
+    mApp.setType(Type.TIME_SERIES);
+    mApp.setValues(metricAppValues);
+    metricsApp.add(mApp);
+    entityApp.addMetrics(metricsApp);
+    teApp.addEntity(entityApp);
+
+    TimelineEntities teEntity = new TimelineEntities();
+    TimelineEntity entity = new TimelineEntity();
+    entity.setId("hello");
+    entity.setType("world");
+    entity.setCreatedTime(1425016501000L);
+    entity.setModifiedTime(1425026901000L);
+    // add metrics with floating point values
+    Set<TimelineMetric> metricsEntity = new HashSet<>();
+    TimelineMetric mEntity = new TimelineMetric();
+    mEntity.setId("MAP_SLOT_MILLIS");
+    mEntity.addValue(ts - 20, 10.5);
+    metricsEntity.add(mEntity);
+    entity.addMetrics(metricsEntity);
+    teEntity.addEntity(entity);
+
+    HBaseTimelineWriterImpl hbi = null;
+    try {
+      Configuration c1 = util.getConfiguration();
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      hbi.start();
+      // Writing application entity.
+      try {
+        hbi.write("c1", "u1", "f1", "v1", 1002345678919L, appId, teApp);
+        Assert.fail("Expected an exception as metric values are non integral");
+      } catch (IOException e) {}
+
+      // Writing generic entity.
+      try {
+        hbi.write("c1", "u1", "f1", "v1", 1002345678919L, appId, teEntity);
+        Assert.fail("Expected an exception as metric values are non integral");
+      } catch (IOException e) {}
+      hbi.stop();
+    } finally {
+      if (hbi != null) {
+        hbi.stop();
+        hbi.close();
+      }
+    }
+  }
+
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
     util.shutdownMiniCluster();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6f57ecc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index b0f83b7..4fb8f0e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@@ -164,10 +165,10 @@ public class TestHBaseStorageFlowRun {
         .getBytes());
 
     assertEquals(2, r1.size());
-    long starttime = (Long) GenericObjectMapper.read(values
-        .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
+    long starttime = Bytes.toLong(values.get(
+        FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
     assertEquals(minStartTs, starttime);
-    assertEquals(endTs, GenericObjectMapper.read(values
+    assertEquals(endTs, Bytes.toLong(values
         .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
 
     // use the timeline reader to verify data
@@ -253,10 +254,10 @@ public class TestHBaseStorageFlowRun {
         }
         switch (id) {
         case metric1:
-          assertEquals(141, value);
+          assertEquals(141L, value);
           break;
         case metric2:
-          assertEquals(57, value);
+          assertEquals(57L, value);
           break;
         default:
           fail("unrecognized metric: " + id);
@@ -292,14 +293,14 @@ public class TestHBaseStorageFlowRun {
       byte[] q = ColumnHelper.getColumnQualifier(
           FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1);
       assertTrue(values.containsKey(q));
-      assertEquals(141, GenericObjectMapper.read(values.get(q)));
+      assertEquals(141L, Bytes.toLong(values.get(q)));
 
       // check metric2
       assertEquals(2, values.size());
       q = ColumnHelper.getColumnQualifier(
           FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2);
       assertTrue(values.containsKey(q));
-      assertEquals(57, GenericObjectMapper.read(values.get(q)));
+      assertEquals(57L, Bytes.toLong(values.get(q)));
     }
     assertEquals(1, rowCount);
   }