You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2016/06/21 23:48:53 UTC
[07/50] [abbrv] hadoop git commit: YARN-4986. Add a check in the
coprocessor for table to operated on (Vrushali C via sjlee)
YARN-4986. Add a check in the coprocessor for table to operated on (Vrushali C via sjlee)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4caa1461
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4caa1461
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4caa1461
Branch: refs/heads/YARN-2928
Commit: 4caa1461bf526938c14f7b305595112f02340d26
Parents: 78ffdf0
Author: Sangjin Lee <sj...@apache.org>
Authored: Fri Apr 29 17:13:32 2016 -0700
Committer: Vrushali <vr...@twitter.com>
Committed: Sun Jun 19 00:20:06 2016 -0700
----------------------------------------------------------------------
.../storage/common/TimelineStorageUtils.java | 20 +++++++
.../storage/entity/EntityTable.java | 2 +-
.../storage/flow/FlowRunCoprocessor.java | 39 +++++++++++--
.../storage/flow/FlowScanner.java | 13 +++--
.../storage/flow/TestHBaseStorageFlowRun.java | 61 ++++++++++++++++++++
.../flow/TestHBaseStorageFlowRunCompaction.java | 36 ++++++++++++
6 files changed, 160 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4caa1461/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
index 2d85bab..18f975a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
@@ -32,8 +32,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Result;
@@ -56,6 +58,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Fiel
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
import org.apache.hadoop.yarn.util.ConverterUtils;
/**
@@ -887,4 +890,21 @@ public final class TimelineStorageUtils {
Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
entity.addEvents(eventsSet);
}
+
+ public static boolean isFlowRunTable(HRegionInfo hRegionInfo,
+ Configuration conf) {
+ String regionTableName = hRegionInfo.getTable().getNameAsString();
+ String flowRunTableName = conf.get(FlowRunTable.TABLE_NAME_CONF_NAME,
+ FlowRunTable.DEFAULT_TABLE_NAME);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("regionTableName=" + regionTableName);
+ }
+ if (flowRunTableName.equalsIgnoreCase(regionTableName)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" table is the flow run table!! " + flowRunTableName);
+ }
+ return true;
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4caa1461/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
index 3e3e3ab..b194f07 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
@@ -84,7 +84,7 @@ public class EntityTable extends BaseTable<EntityTable> {
+ ".table.metrics.ttl";
/** default value for entity table name. */
- private static final String DEFAULT_TABLE_NAME = "timelineservice.entity";
+ public static final String DEFAULT_TABLE_NAME = "timelineservice.entity";
/** default TTL is 30 days for metrics timeseries. */
private static final int DEFAULT_METRICS_TTL = 2592000;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4caa1461/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
index 450640a..8ea51a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGen
public class FlowRunCoprocessor extends BaseRegionObserver {
private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class);
+ private boolean isFlowRunRegion = false;
private HRegion region;
/**
@@ -70,9 +71,15 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
if (e instanceof RegionCoprocessorEnvironment) {
RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
this.region = env.getRegion();
+ isFlowRunRegion = TimelineStorageUtils.isFlowRunTable(
+ region.getRegionInfo(), env.getConfiguration());
}
}
+ public boolean isFlowRunRegion() {
+ return isFlowRunRegion;
+ }
+
/*
* (non-Javadoc)
*
@@ -93,6 +100,9 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
WALEdit edit, Durability durability) throws IOException {
Map<String, byte[]> attributes = put.getAttributesMap();
+ if (!isFlowRunRegion) {
+ return;
+ }
// Assumption is that all the cells in a put are the same operation.
List<Tag> tags = new ArrayList<>();
if ((attributes != null) && (attributes.size() > 0)) {
@@ -160,6 +170,10 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
@Override
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e,
Get get, List<Cell> results) throws IOException {
+ if (!isFlowRunRegion) {
+ return;
+ }
+
Scan scan = new Scan(get);
scan.setMaxVersions();
RegionScanner scanner = null;
@@ -190,11 +204,14 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
@Override
public RegionScanner preScannerOpen(
ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
- RegionScanner s) throws IOException {
- // set max versions for scan to see all
- // versions to aggregate for metrics
- scan.setMaxVersions();
- return s;
+ RegionScanner scanner) throws IOException {
+
+ if (isFlowRunRegion) {
+ // set max versions for scan to see all
+ // versions to aggregate for metrics
+ scan.setMaxVersions();
+ }
+ return scanner;
}
/*
@@ -213,6 +230,9 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
public RegionScanner postScannerOpen(
ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
RegionScanner scanner) throws IOException {
+ if (!isFlowRunRegion) {
+ return scanner;
+ }
return new FlowScanner(e.getEnvironment(), scan.getBatch(),
scanner, FlowScannerOperation.READ);
}
@@ -221,6 +241,9 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
public InternalScanner preFlush(
ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner) throws IOException {
+ if (!isFlowRunRegion) {
+ return scanner;
+ }
if (LOG.isDebugEnabled()) {
if (store != null) {
LOG.debug("preFlush store = " + store.getColumnFamilyName()
@@ -241,6 +264,9 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
@Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, StoreFile resultFile) {
+ if (!isFlowRunRegion) {
+ return;
+ }
if (LOG.isDebugEnabled()) {
if (store != null) {
LOG.debug("postFlush store = " + store.getColumnFamilyName()
@@ -262,6 +288,9 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
InternalScanner scanner, ScanType scanType, CompactionRequest request)
throws IOException {
+ if (!isFlowRunRegion) {
+ return scanner;
+ }
FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION;
if (request != null) {
requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4caa1461/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index 0ace529..398d7b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -210,7 +210,7 @@ class FlowScanner implements RegionScanner, Closeable {
if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) {
if (converter != null && isNumericConverter(converter)) {
addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
- (NumericValueConverter)converter, currentTimestamp);
+ converter, currentTimestamp);
}
resetState(currentColumnCells, alreadySeenAggDim);
currentColumnQualifier = newColumnQualifier;
@@ -219,6 +219,7 @@ class FlowScanner implements RegionScanner, Closeable {
}
// No operation needs to be performed on non numeric converters.
if (!isNumericConverter(converter)) {
+ currentColumnCells.add(cell);
nextCell(cellLimit);
continue;
}
@@ -228,7 +229,7 @@ class FlowScanner implements RegionScanner, Closeable {
}
if (!currentColumnCells.isEmpty()) {
addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
- (NumericValueConverter)converter, currentTimestamp);
+ converter, currentTimestamp);
if (LOG.isDebugEnabled()) {
if (addedCnt > 0) {
LOG.debug("emitted cells. " + addedCnt + " for " + this.action
@@ -345,7 +346,7 @@ class FlowScanner implements RegionScanner, Closeable {
* parameter.
*/
private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells,
- AggregationOperation currentAggOp, NumericValueConverter converter,
+ AggregationOperation currentAggOp, ValueConverter converter,
long currentTimestamp) throws IOException {
if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
return 0;
@@ -372,12 +373,14 @@ class FlowScanner implements RegionScanner, Closeable {
cells.addAll(currentColumnCells);
return currentColumnCells.size();
case READ:
- Cell sumCell = processSummation(currentColumnCells, converter);
+ Cell sumCell = processSummation(currentColumnCells,
+ (NumericValueConverter) converter);
cells.add(sumCell);
return 1;
case MAJOR_COMPACTION:
List<Cell> finalCells = processSummationMajorCompaction(
- currentColumnCells, converter, currentTimestamp);
+ currentColumnCells, (NumericValueConverter) converter,
+ currentTimestamp);
cells.addAll(finalCells);
return finalCells.size();
default:
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4caa1461/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index a724db2..801d43c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -19,18 +19,21 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.EnumSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
@@ -38,6 +41,8 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
@@ -57,6 +62,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriter
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -84,6 +91,60 @@ public class TestHBaseStorageFlowRun {
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
}
+ @Test
+ public void checkCoProcessorOff() throws IOException, InterruptedException {
+ Configuration hbaseConf = util.getConfiguration();
+ TableName table = TableName.valueOf(hbaseConf.get(
+ FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
+ Connection conn = null;
+ conn = ConnectionFactory.createConnection(hbaseConf);
+ Admin admin = conn.getAdmin();
+ if (admin == null) {
+ throw new IOException("Can't check tables since admin is null");
+ }
+ if (admin.tableExists(table)) {
+ // check the regions.
+ // check in flow run table
+ util.waitUntilAllRegionsAssigned(table);
+ HRegionServer server = util.getRSForFirstRegionInTable(table);
+ List<HRegion> regions = server.getOnlineRegions(table);
+ for (HRegion region : regions) {
+ assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
+ hbaseConf));
+ }
+ }
+
+ table = TableName.valueOf(hbaseConf.get(
+ FlowActivityTable.TABLE_NAME_CONF_NAME,
+ FlowActivityTable.DEFAULT_TABLE_NAME));
+ if (admin.tableExists(table)) {
+ // check the regions.
+ // check in flow activity table
+ util.waitUntilAllRegionsAssigned(table);
+ HRegionServer server = util.getRSForFirstRegionInTable(table);
+ List<HRegion> regions = server.getOnlineRegions(table);
+ for (HRegion region : regions) {
+ assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
+ hbaseConf));
+ }
+ }
+
+ table = TableName.valueOf(hbaseConf.get(
+ EntityTable.TABLE_NAME_CONF_NAME,
+ EntityTable.DEFAULT_TABLE_NAME));
+ if (admin.tableExists(table)) {
+ // check the regions.
+ // check in entity run table
+ util.waitUntilAllRegionsAssigned(table);
+ HRegionServer server = util.getRSForFirstRegionInTable(table);
+ List<HRegion> regions = server.getOnlineRegions(table);
+ for (HRegion region : regions) {
+ assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
+ hbaseConf));
+ }
+ }
+ }
+
/**
* Writes 4 timeline entities belonging to one flow run through the
* {@link HBaseTimelineWriterImpl}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4caa1461/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
index 2738e6a..e7e7ba4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
@@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -87,6 +89,40 @@ public class TestHBaseStorageFlowRunCompaction {
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
}
+ /** writes non numeric data into flow run table
+ * reads it back
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testWriteNonNumericData() throws Exception {
+ String rowKey = "nonNumericRowKey";
+ String column = "nonNumericColumnName";
+ String value = "nonNumericValue";
+ byte[] rowKeyBytes = Bytes.toBytes(rowKey);
+ byte[] columnNameBytes = Bytes.toBytes(column);
+ byte[] valueBytes = Bytes.toBytes(value);
+ Put p = new Put(rowKeyBytes);
+ p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes,
+ valueBytes);
+ Configuration hbaseConf = util.getConfiguration();
+ TableName table = TableName.valueOf(hbaseConf.get(
+ FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
+ Connection conn = null;
+ conn = ConnectionFactory.createConnection(hbaseConf);
+ Table flowRunTable = conn.getTable(table);
+ flowRunTable.put(p);
+
+ Get g = new Get(rowKeyBytes);
+ Result r = flowRunTable.get(g);
+ assertNotNull(r);
+ assertTrue(r.size() >= 1);
+ Cell actualValue = r.getColumnLatestCell(
+ FlowRunColumnFamily.INFO.getBytes(), columnNameBytes);
+ assertNotNull(CellUtil.cloneValue(actualValue));
+ assertEquals(Bytes.toString(CellUtil.cloneValue(actualValue)), value);
+ }
+
@Test
public void testWriteFlowRunCompaction() throws Exception {
String cluster = "kompaction_cluster1";
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org