You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2016/06/21 23:48:53 UTC

[07/50] [abbrv] hadoop git commit: YARN-4986. Add a check in the coprocessor for table to operated on (Vrushali C via sjlee)

YARN-4986. Add a check in the coprocessor for table to operated on (Vrushali C via sjlee)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4caa1461
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4caa1461
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4caa1461

Branch: refs/heads/YARN-2928
Commit: 4caa1461bf526938c14f7b305595112f02340d26
Parents: 78ffdf0
Author: Sangjin Lee <sj...@apache.org>
Authored: Fri Apr 29 17:13:32 2016 -0700
Committer: Vrushali <vr...@twitter.com>
Committed: Sun Jun 19 00:20:06 2016 -0700

----------------------------------------------------------------------
 .../storage/common/TimelineStorageUtils.java    | 20 +++++++
 .../storage/entity/EntityTable.java             |  2 +-
 .../storage/flow/FlowRunCoprocessor.java        | 39 +++++++++++--
 .../storage/flow/FlowScanner.java               | 13 +++--
 .../storage/flow/TestHBaseStorageFlowRun.java   | 61 ++++++++++++++++++++
 .../flow/TestHBaseStorageFlowRunCompaction.java | 36 ++++++++++++
 6 files changed, 160 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4caa1461/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
index 2d85bab..18f975a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
@@ -32,8 +32,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.client.Result;
@@ -56,6 +58,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Fiel
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 /**
@@ -887,4 +890,21 @@ public final class TimelineStorageUtils {
     Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
     entity.addEvents(eventsSet);
   }
+
+  public static boolean isFlowRunTable(HRegionInfo hRegionInfo,
+      Configuration conf) {
+    String regionTableName = hRegionInfo.getTable().getNameAsString();
+    String flowRunTableName = conf.get(FlowRunTable.TABLE_NAME_CONF_NAME,
+        FlowRunTable.DEFAULT_TABLE_NAME);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("regionTableName=" + regionTableName);
+    }
+    if (flowRunTableName.equalsIgnoreCase(regionTableName)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(" table is the flow run table!! " + flowRunTableName);
+      }
+      return true;
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4caa1461/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
index 3e3e3ab..b194f07 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
@@ -84,7 +84,7 @@ public class EntityTable extends BaseTable<EntityTable> {
       + ".table.metrics.ttl";
 
   /** default value for entity table name. */
-  private static final String DEFAULT_TABLE_NAME = "timelineservice.entity";
+  public static final String DEFAULT_TABLE_NAME = "timelineservice.entity";
 
   /** default TTL is 30 days for metrics timeseries. */
   private static final int DEFAULT_METRICS_TTL = 2592000;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4caa1461/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
index 450640a..8ea51a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGen
 public class FlowRunCoprocessor extends BaseRegionObserver {
 
   private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class);
+  private boolean isFlowRunRegion = false;
 
   private HRegion region;
   /**
@@ -70,9 +71,15 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
     if (e instanceof RegionCoprocessorEnvironment) {
       RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
       this.region = env.getRegion();
+      isFlowRunRegion = TimelineStorageUtils.isFlowRunTable(
+          region.getRegionInfo(), env.getConfiguration());
     }
   }
 
+  public boolean isFlowRunRegion() {
+    return isFlowRunRegion;
+  }
+
   /*
    * (non-Javadoc)
    *
@@ -93,6 +100,9 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
       WALEdit edit, Durability durability) throws IOException {
     Map<String, byte[]> attributes = put.getAttributesMap();
 
+    if (!isFlowRunRegion) {
+      return;
+    }
     // Assumption is that all the cells in a put are the same operation.
     List<Tag> tags = new ArrayList<>();
     if ((attributes != null) && (attributes.size() > 0)) {
@@ -160,6 +170,10 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
   @Override
   public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e,
       Get get, List<Cell> results) throws IOException {
+    if (!isFlowRunRegion) {
+      return;
+    }
+
     Scan scan = new Scan(get);
     scan.setMaxVersions();
     RegionScanner scanner = null;
@@ -190,11 +204,14 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
   @Override
   public RegionScanner preScannerOpen(
       ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
-      RegionScanner s) throws IOException {
-    // set max versions for scan to see all
-    // versions to aggregate for metrics
-    scan.setMaxVersions();
-    return s;
+      RegionScanner scanner) throws IOException {
+
+    if (isFlowRunRegion) {
+      // set max versions for scan to see all
+      // versions to aggregate for metrics
+      scan.setMaxVersions();
+    }
+    return scanner;
   }
 
   /*
@@ -213,6 +230,9 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
   public RegionScanner postScannerOpen(
       ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
       RegionScanner scanner) throws IOException {
+    if (!isFlowRunRegion) {
+      return scanner;
+    }
     return new FlowScanner(e.getEnvironment(), scan.getBatch(),
         scanner, FlowScannerOperation.READ);
   }
@@ -221,6 +241,9 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
   public InternalScanner preFlush(
       ObserverContext<RegionCoprocessorEnvironment> c, Store store,
       InternalScanner scanner) throws IOException {
+    if (!isFlowRunRegion) {
+      return scanner;
+    }
     if (LOG.isDebugEnabled()) {
       if (store != null) {
         LOG.debug("preFlush store = " + store.getColumnFamilyName()
@@ -241,6 +264,9 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
   @Override
   public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
       Store store, StoreFile resultFile) {
+    if (!isFlowRunRegion) {
+      return;
+    }
     if (LOG.isDebugEnabled()) {
       if (store != null) {
         LOG.debug("postFlush store = " + store.getColumnFamilyName()
@@ -262,6 +288,9 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
       InternalScanner scanner, ScanType scanType, CompactionRequest request)
       throws IOException {
 
+    if (!isFlowRunRegion) {
+      return scanner;
+    }
     FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION;
     if (request != null) {
       requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4caa1461/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index 0ace529..398d7b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -210,7 +210,7 @@ class FlowScanner implements RegionScanner, Closeable {
       if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) {
         if (converter != null && isNumericConverter(converter)) {
           addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
-              (NumericValueConverter)converter, currentTimestamp);
+              converter, currentTimestamp);
         }
         resetState(currentColumnCells, alreadySeenAggDim);
         currentColumnQualifier = newColumnQualifier;
@@ -219,6 +219,7 @@ class FlowScanner implements RegionScanner, Closeable {
       }
       // No operation needs to be performed on non numeric converters.
       if (!isNumericConverter(converter)) {
+        currentColumnCells.add(cell);
         nextCell(cellLimit);
         continue;
       }
@@ -228,7 +229,7 @@ class FlowScanner implements RegionScanner, Closeable {
     }
     if (!currentColumnCells.isEmpty()) {
       addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
-          (NumericValueConverter)converter, currentTimestamp);
+          converter, currentTimestamp);
       if (LOG.isDebugEnabled()) {
         if (addedCnt > 0) {
           LOG.debug("emitted cells. " + addedCnt + " for " + this.action
@@ -345,7 +346,7 @@ class FlowScanner implements RegionScanner, Closeable {
    * parameter.
    */
   private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells,
-      AggregationOperation currentAggOp, NumericValueConverter converter,
+      AggregationOperation currentAggOp, ValueConverter converter,
       long currentTimestamp) throws IOException {
     if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
       return 0;
@@ -372,12 +373,14 @@ class FlowScanner implements RegionScanner, Closeable {
         cells.addAll(currentColumnCells);
         return currentColumnCells.size();
       case READ:
-        Cell sumCell = processSummation(currentColumnCells, converter);
+        Cell sumCell = processSummation(currentColumnCells,
+            (NumericValueConverter) converter);
         cells.add(sumCell);
         return 1;
       case MAJOR_COMPACTION:
         List<Cell> finalCells = processSummationMajorCompaction(
-            currentColumnCells, converter, currentTimestamp);
+            currentColumnCells, (NumericValueConverter) converter,
+            currentTimestamp);
         cells.addAll(finalCells);
         return finalCells.size();
       default:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4caa1461/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index a724db2..801d43c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -19,18 +19,21 @@
 package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.EnumSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Get;
@@ -38,6 +41,8 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
@@ -57,6 +62,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriter
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -84,6 +91,60 @@ public class TestHBaseStorageFlowRun {
     TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
   }
 
+  @Test
+  public void checkCoProcessorOff() throws IOException, InterruptedException {
+    Configuration hbaseConf = util.getConfiguration();
+    TableName table = TableName.valueOf(hbaseConf.get(
+        FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
+    Connection conn = null;
+    conn = ConnectionFactory.createConnection(hbaseConf);
+    Admin admin = conn.getAdmin();
+    if (admin == null) {
+      throw new IOException("Can't check tables since admin is null");
+    }
+    if (admin.tableExists(table)) {
+      // check the regions.
+      // check in flow run table
+      util.waitUntilAllRegionsAssigned(table);
+      HRegionServer server = util.getRSForFirstRegionInTable(table);
+      List<HRegion> regions = server.getOnlineRegions(table);
+      for (HRegion region : regions) {
+        assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
+            hbaseConf));
+      }
+    }
+
+    table = TableName.valueOf(hbaseConf.get(
+        FlowActivityTable.TABLE_NAME_CONF_NAME,
+        FlowActivityTable.DEFAULT_TABLE_NAME));
+    if (admin.tableExists(table)) {
+      // check the regions.
+      // check in flow activity table
+      util.waitUntilAllRegionsAssigned(table);
+      HRegionServer server = util.getRSForFirstRegionInTable(table);
+      List<HRegion> regions = server.getOnlineRegions(table);
+      for (HRegion region : regions) {
+        assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
+            hbaseConf));
+      }
+    }
+
+    table = TableName.valueOf(hbaseConf.get(
+        EntityTable.TABLE_NAME_CONF_NAME,
+        EntityTable.DEFAULT_TABLE_NAME));
+    if (admin.tableExists(table)) {
+      // check the regions.
+      // check in entity run table
+      util.waitUntilAllRegionsAssigned(table);
+      HRegionServer server = util.getRSForFirstRegionInTable(table);
+      List<HRegion> regions = server.getOnlineRegions(table);
+      for (HRegion region : regions) {
+        assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
+            hbaseConf));
+      }
+    }
+  }
+
   /**
    * Writes 4 timeline entities belonging to one flow run through the
    * {@link HBaseTimelineWriterImpl}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4caa1461/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
index 2738e6a..e7e7ba4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
@@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
@@ -87,6 +89,40 @@ public class TestHBaseStorageFlowRunCompaction {
     TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
   }
 
+  /** writes non numeric data into flow run table
+   * reads it back
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testWriteNonNumericData() throws Exception {
+    String rowKey = "nonNumericRowKey";
+    String column = "nonNumericColumnName";
+    String value = "nonNumericValue";
+    byte[] rowKeyBytes = Bytes.toBytes(rowKey);
+    byte[] columnNameBytes = Bytes.toBytes(column);
+    byte[] valueBytes = Bytes.toBytes(value);
+    Put p = new Put(rowKeyBytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes,
+        valueBytes);
+    Configuration hbaseConf = util.getConfiguration();
+    TableName table = TableName.valueOf(hbaseConf.get(
+        FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
+    Connection conn = null;
+    conn = ConnectionFactory.createConnection(hbaseConf);
+    Table flowRunTable = conn.getTable(table);
+    flowRunTable.put(p);
+
+    Get g = new Get(rowKeyBytes);
+    Result r = flowRunTable.get(g);
+    assertNotNull(r);
+    assertTrue(r.size() >= 1);
+    Cell actualValue = r.getColumnLatestCell(
+        FlowRunColumnFamily.INFO.getBytes(), columnNameBytes);
+    assertNotNull(CellUtil.cloneValue(actualValue));
+    assertEquals(Bytes.toString(CellUtil.cloneValue(actualValue)), value);
+  }
+
   @Test
   public void testWriteFlowRunCompaction() throws Exception {
     String cluster = "kompaction_cluster1";


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org