You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2016/07/13 10:08:58 UTC
apex-malhar git commit: APEXMALHAR-1957: Added threading for reading
data from hbase. Added support for progressive read. Added unit tests.
Repository: apex-malhar
Updated Branches:
refs/heads/master 67b84dda4 -> 0b66f19d1
APEXMALHAR-1957: Added threading for reading data from hbase. Added support for progressive read. Added unit tests.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0b66f19d
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0b66f19d
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0b66f19d
Branch: refs/heads/master
Commit: 0b66f19d14a518b83059c74573dc7fdd58693788
Parents: 67b84dd
Author: bhupesh <bh...@gmail.com>
Authored: Thu Dec 24 18:50:25 2015 +0530
Committer: bhupesh <bh...@gmail.com>
Committed: Wed Jul 13 11:53:06 2016 +0530
----------------------------------------------------------------------
.../contrib/hbase/HBaseFieldValueGenerator.java | 59 ++++++
.../contrib/hbase/HBaseGetOperator.java | 2 +-
.../contrib/hbase/HBaseInputOperator.java | 39 +---
.../contrib/hbase/HBasePOJOInputOperator.java | 156 ++++++++--------
.../contrib/hbase/HBaseScanOperator.java | 181 ++++++++++++++++++-
.../contrib/hbase/HBaseGetOperatorTest.java | 8 +-
.../hbase/HBasePOJOInputOperatorTest.java | 37 +++-
.../contrib/hbase/HBaseScanOperatorTest.java | 6 +-
8 files changed, 351 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldValueGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldValueGenerator.java b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldValueGenerator.java
new file mode 100644
index 0000000..52b6f4b
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldValueGenerator.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.hbase;
+
+import java.util.List;
+
+import com.datatorrent.lib.util.FieldValueGenerator;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * A {@link FieldValueGenerator} implementation for {@link HBaseFieldInfo}
+ */
+public class HBaseFieldValueGenerator extends FieldValueGenerator<HBaseFieldInfo>
+{
+ public static final String COLON = ":";
+
+ @SuppressWarnings("unchecked")
+ protected HBaseFieldValueGenerator(final Class<?> clazz, List<HBaseFieldInfo> fieldInfos)
+ {
+ for (HBaseFieldInfo fieldInfo : fieldInfos) {
+ fieldInfoMap.put(fieldInfo.getFamilyName() + COLON + fieldInfo.getColumnName(), fieldInfo);
+
+ PojoUtils.Getter<Object, Object> getter =
+ PojoUtils.createGetter(clazz, fieldInfo.getPojoFieldExpression(), fieldInfo.getType().getJavaType());
+ fieldGetterMap.put(fieldInfo, getter);
+ }
+
+ for (HBaseFieldInfo fieldInfo : fieldInfos) {
+ PojoUtils.Setter<Object, Object> setter =
+ PojoUtils.createSetter(clazz, fieldInfo.getPojoFieldExpression(), fieldInfo.getType().getJavaType());
+ fieldSetterMap.put(fieldInfo, setter);
+ }
+ }
+
+ public void setColumnValue(Object instance, String columnName, String columnFamily, Object value,
+ ValueConverter<HBaseFieldInfo> valueConverter)
+ {
+ HBaseFieldInfo fieldInfo = fieldInfoMap.get(columnFamily + COLON + columnName);
+ PojoUtils.Setter<Object, Object> setter = fieldSetterMap.get(fieldInfo);
+ setter.set(instance, valueConverter == null ? value : valueConverter.convertValue(fieldInfo, value));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseGetOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseGetOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseGetOperator.java
index 0f583e0..37270d5 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseGetOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseGetOperator.java
@@ -47,7 +47,7 @@ public abstract class HBaseGetOperator<T> extends HBaseInputOperator<T>
{
try {
Get get = operationGet();
- Result result = table.get(get);
+ Result result = getStore().getTable().get(get);
KeyValue[] kvs = result.raw();
//T t = getTuple(kvs);
//T t = getTuple(result);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseInputOperator.java
index 121e703..6f11621 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseInputOperator.java
@@ -18,10 +18,8 @@
*/
package com.datatorrent.contrib.hbase;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
-import java.io.IOException;
+import com.datatorrent.lib.db.AbstractStoreInputOperator;
/**
* A base implementation of hbase input operator which derives from HBaseOperatorBase. <br>
@@ -33,39 +31,6 @@ import java.io.IOException;
* @param <T> The tuple type
* @since 0.3.2
*/
-public abstract class HBaseInputOperator<T> extends HBaseOperatorBase implements InputOperator
+public abstract class HBaseInputOperator<T> extends AbstractStoreInputOperator<T, HBaseStore> implements InputOperator
{
- /**
- * Output port that emits tuples into the DAG.
- */
- public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<T>();
-
- //protected abstract T getTuple(Result result);
- //protected abstract T getTuple(KeyValue kv);
-
- @Override
- public void beginWindow(long windowId)
- {
- }
-
- @Override
- public void endWindow()
- {
- }
-
- @Override
- public void setup(OperatorContext context)
- {
- try{
- setupConfiguration();
- } catch (IOException ie) {
- throw new RuntimeException(ie);
- }
- }
-
- @Override
- public void teardown()
- {
- }
-
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java
index 4182e84..e459ec7 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java
@@ -18,14 +18,14 @@
*/
package com.datatorrent.contrib.hbase;
-import java.io.IOException;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
@@ -34,27 +34,30 @@ import com.datatorrent.lib.util.FieldValueGenerator.ValueConverter;
import com.datatorrent.lib.util.PojoUtils;
import com.datatorrent.lib.util.PojoUtils.Setter;
import com.datatorrent.lib.util.TableInfo;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Context.OperatorContext;
/**
+ * HBasePOJOInputOperator reads data from a HBase store, converts it to a POJO and puts it on the output port.
+ * The read from HBase is asynchronous.
* @displayName HBase Input Operator
* @category Input
* @tags database, nosql, pojo, hbase
* @since 3.1.0
*/
@Evolving
-public class HBasePOJOInputOperator extends HBaseInputOperator<Object>
+public class HBasePOJOInputOperator extends HBaseScanOperator<Object>
{
private TableInfo<HBaseFieldInfo> tableInfo;
- protected HBaseStore store;
private String pojoTypeName;
- private String startRow;
- private String lastReadRow;
- protected transient Class pojoType;
- private transient Setter<Object, String> rowSetter;
+ // Transients
+ protected transient Class<?> pojoType;
protected transient FieldValueGenerator<HBaseFieldInfo> fieldValueGenerator;
protected transient BytesValueConverter valueConverter;
+ private transient Scan scan;
+ private transient Setter<Object, String> rowSetter;
public static class BytesValueConverter implements ValueConverter<HBaseFieldInfo>
{
@@ -65,120 +68,121 @@ public class HBasePOJOInputOperator extends HBaseInputOperator<Object>
}
}
- @Override
- public void setup(OperatorContext context)
+ public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>()
{
- try {
- store.connect();
- pojoType = Class.forName(pojoTypeName);
- pojoType.newInstance(); //try create new instance to verify the class.
- rowSetter = PojoUtils.createSetter(pojoType, tableInfo.getRowOrIdExpression(), String.class);
- fieldValueGenerator = FieldValueGenerator.getFieldValueGenerator(pojoType, tableInfo.getFieldsInfo() );
- valueConverter = new BytesValueConverter();
- } catch (Exception ex) {
- throw new RuntimeException(ex);
+ public void setup(com.datatorrent.api.Context.PortContext context)
+ {
+ pojoType = context.getAttributes().get(Context.PortContext.TUPLE_CLASS);
}
- }
-
- @Override
- public void beginWindow(long windowId)
- {
- }
+ };
@Override
- public void teardown()
+ public void activate(Context context)
{
try {
- store.disconnect();
- } catch (IOException ex) {
+ pojoType.newInstance(); // try create new instance to verify the class.
+ rowSetter = PojoUtils.createSetter(pojoType, tableInfo.getRowOrIdExpression(), String.class);
+ fieldValueGenerator = new HBaseFieldValueGenerator(pojoType, tableInfo.getFieldsInfo());
+ valueConverter = new BytesValueConverter();
+ scan = new Scan();
+ super.activate(context);
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@Override
- public void emitTuples()
+ protected Object getTuple(Result result)
{
try {
- Scan scan = nextScan();
- if (scan == null)
- return;
-
- ResultScanner resultScanner = store.getTable().getScanner(scan);
-
- while (true) {
- Result result = resultScanner.next();
- if (result == null)
- break;
-
- String readRow = Bytes.toString(result.getRow());
- if( readRow.equals( lastReadRow ))
- continue;
-
- Object instance = pojoType.newInstance();
- rowSetter.set(instance, readRow);
-
- List<Cell> cells = result.listCells();
+ String readRow = Bytes.toString(result.getRow());
+ if( readRow.equals( getLastReadRow() )) {
+ return null;
+ }
- for (Cell cell : cells) {
- String columnName = Bytes.toString(CellUtil.cloneQualifier(cell));
- byte[] value = CellUtil.cloneValue(cell);
- fieldValueGenerator.setColumnValue( instance, columnName, value, valueConverter );
- }
+ Object instance = pojoType.newInstance();
+ rowSetter.set(instance, readRow);
- outputPort.emit(instance);
- lastReadRow = readRow;
+ List<Cell> cells = result.listCells();
+ for (Cell cell : cells) {
+ String columnName = Bytes.toString(CellUtil.cloneQualifier(cell));
+ String columnFamily = Bytes.toString(CellUtil.cloneFamily(cell));
+ byte[] value = CellUtil.cloneValue(cell);
+ ((HBaseFieldValueGenerator)fieldValueGenerator).setColumnValue(instance, columnName, columnFamily, value,
+ valueConverter);
}
+ setLastReadRow(readRow);
+ return instance;
} catch (Exception e) {
- throw new RuntimeException(e.getMessage());
+ throw new RuntimeException(e);
}
-
}
- protected Scan nextScan()
+ @Override
+ protected Scan operationScan()
{
- if(lastReadRow==null && startRow==null )
- return new Scan();
- else
- return new Scan( Bytes.toBytes( lastReadRow == null ? startRow : lastReadRow ) );
+ if (getLastReadRow() == null && getStartRow() == null) {
+ // If no start row specified and no row read yet
+ if (scan == null) {
+ scan = new Scan();
+ }
+ } else if (getEndRow() == null) {
+ // If only start row specified
+ scan.setStartRow(Bytes.toBytes(getLastReadRow() == null ? getStartRow() : getLastReadRow()));
+ } else {
+ // If end row also specified
+ scan.setStartRow(Bytes.toBytes(getLastReadRow() == null ? getStartRow() : getLastReadRow()));
+ scan.setStopRow(Bytes.toBytes(getEndRow()));
+ }
+ for (HBaseFieldInfo field : tableInfo.getFieldsInfo()) {
+ scan.addColumn(Bytes.toBytes(field.getFamilyName()), Bytes.toBytes(field.getColumnName()));
+ }
+ return scan;
}
- public HBaseStore getStore()
- {
- return store;
- }
- public void setStore(HBaseStore store)
+ @Override
+ protected void emitTuple(Object tuple)
{
- this.store = store;
+ outputPort.emit(tuple);
}
+ /**
+ * Returns the {@link #tableInfo} object as configured
+ * @return {@link #tableInfo}
+ */
public TableInfo<HBaseFieldInfo> getTableInfo()
{
return tableInfo;
}
+ /**
+ * Sets the {@link #tableInfo} object
+ * @param tableInfo
+ */
public void setTableInfo(TableInfo<HBaseFieldInfo> tableInfo)
{
this.tableInfo = tableInfo;
}
+ /**
+ * Returns the POJO class name
+ * @return {@link #pojoTypeName}
+ */
public String getPojoTypeName()
{
return pojoTypeName;
}
+ /**
+ * Sets the POJO class name
+ * @param pojoTypeName
+ */
public void setPojoTypeName(String pojoTypeName)
{
this.pojoTypeName = pojoTypeName;
}
- public String getStartRow()
- {
- return startRow;
- }
+ private static final Logger logger = LoggerFactory.getLogger(HBasePOJOInputOperator.class);
- public void setStartRow(String startRow)
- {
- this.startRow = startRow;
- }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java
index 1b0d657..b694e67 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java
@@ -18,8 +18,19 @@
*/
package com.datatorrent.contrib.hbase;
+import java.io.IOException;
+import java.util.Queue;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Context.OperatorContext;
+import com.google.common.collect.Queues;
+
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -40,27 +51,121 @@ import org.apache.hadoop.hbase.client.ResultScanner;
* @tags hbase, scan, input operator
* @since 0.3.2
*/
-public abstract class HBaseScanOperator<T> extends HBaseInputOperator<T>
+public abstract class HBaseScanOperator<T> extends HBaseInputOperator<T> implements Operator.ActivationListener<Context>
{
+ public static final int DEF_QUEUE_SIZE = 1000;
+ public static final int DEF_SLEEP_MILLIS = 10;
+
+ private String startRow;
+ private String endRow;
+ private String lastReadRow;
+ private int queueSize = DEF_QUEUE_SIZE;
+ private int sleepMillis = DEF_SLEEP_MILLIS;
+ private Queue<Result> resultQueue;
+ private volatile boolean running;
+
+ @AutoMetric
+ protected long tuplesRead;
+
+ // Transients
+ protected transient Scan scan;
+ protected transient ResultScanner scanner;
+ protected transient Thread readThread;
+ private transient String threadFailureReason = null;
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ resultQueue = Queues.newLinkedBlockingQueue(queueSize);
+ }
+
+ @Override
+ public void activate(Context context)
+ {
+ startReadThread();
+ }
+
+ protected void startReadThread()
+ {
+ try {
+ scan = operationScan();
+ scanner = getStore().getTable().getScanner(scan);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ readThread = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try {
+ Result result;
+ while (true) {
+ if ((result = scanner.next()) != null) {
+ while ( running && !resultQueue.offer(result)) {
+ Thread.sleep(sleepMillis);
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.debug("Exception in fetching results {}", e.getMessage());
+ threadFailureReason = e.getMessage();
+ } finally {
+ scanner.close();
+ }
+ }
+ });
+ readThread.start();
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ tuplesRead = 0;
+ running = true;
+ }
@Override
public void emitTuples()
{
+ if (!readThread.isAlive()) {
+ throw new RuntimeException(threadFailureReason);
+ }
try {
- HTable table = getTable();
- Scan scan = operationScan();
- ResultScanner scanner = table.getScanner(scan);
- for (Result result : scanner) {
- //KeyValue[] kvs = result.raw();
- //T t = getTuple(kvs);
- T t = getTuple(result);
- outputPort.emit(t);
+ Result result = resultQueue.poll();
+ if (result == null) {
+ return;
+ }
+ T tuple = getTuple(result);
+ if (tuple != null) {
+ emitTuple(tuple);
+ tuplesRead++;
}
} catch (Exception e) {
- e.printStackTrace();
+ throw new RuntimeException(e);
}
}
+ protected void emitTuple(T tuple)
+ {
+ outputPort.emit(tuple);
+ }
+
+ @Override
+ public void endWindow()
+ {
+ running = false;
+ super.endWindow();
+ }
+
+ @Override
+ public void deactivate()
+ {
+ readThread.interrupt();
+ }
+
/**
* Return a HBase Scan metric to retrieve the tuple.
* The implementor should return a HBase Scan metric that specifies where to retrieve the tuple from
@@ -79,4 +184,60 @@ public abstract class HBaseScanOperator<T> extends HBaseInputOperator<T>
*/
protected abstract T getTuple(Result result);
+ /**
+ * Returns the start row key in the table as set previously
+ * @return {@link #startRow}
+ */
+ public String getStartRow()
+ {
+ return startRow;
+ }
+
+ /**
+ * Sets the start row key in the table from where the scan should begin
+ * @param startRow
+ */
+ public void setStartRow(String startRow)
+ {
+ this.startRow = startRow;
+ }
+
+ /**
+ * Returns the end row key in the table as set previously
+ * @return {@link #endRow}
+ */
+ public String getEndRow()
+ {
+ return endRow;
+ }
+
+ /**
+ * Sets the end row key in the table where the scan should end
+ * @param endRow
+ */
+ public void setEndRow(String endRow)
+ {
+ this.endRow = endRow;
+ }
+
+ /**
+ * Returns the last read row key from the hbase table
+ * @return {@link #lastReadRow}
+ */
+ public String getLastReadRow()
+ {
+ return lastReadRow;
+ }
+
+ /**
+ * Sets the last read row key from the hbase table. After the failures, the new scan will start from this row key
+ * @param lastReadRow
+ */
+ public void setLastReadRow(String lastReadRow)
+ {
+ this.lastReadRow = lastReadRow;
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(HBaseScanOperator.class);
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseGetOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseGetOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseGetOperatorTest.java
index 9c60507..44f2323 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseGetOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseGetOperatorTest.java
@@ -51,12 +51,14 @@ public class HBaseGetOperatorTest
dag.setAttribute(DAG.APPLICATION_NAME, "HBaseGetOperatorTest");
TestHBaseGetOperator thop = dag.addOperator("testhbaseget", TestHBaseGetOperator.class);
+ HBaseStore store = new HBaseStore();
+ thop.setStore(store);
HBaseTupleCollector tc = dag.addOperator("tuplecollector", HBaseTupleCollector.class);
dag.addStream("ss", thop.outputPort, tc.inputPort);
- thop.setTableName("table1");
- thop.setZookeeperQuorum("127.0.0.1");
- thop.setZookeeperClientPort(2181);
+ thop.getStore().setTableName("table1");
+ thop.getStore().setZookeeperQuorum("127.0.0.1");
+ thop.getStore().setZookeeperClientPort(2181);
LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(false);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java
index d0b3e94..4e6bb39 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java
@@ -28,10 +28,13 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.internal.runners.statements.Fail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.Attribute.AttributeMap;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
@@ -59,17 +62,33 @@ public class HBasePOJOInputOperatorTest
this.setTupleType( TestPOJO.class );
}
}
-
+
+ public static class TestHBasePOJOInputOperator extends HBasePOJOInputOperator
+ {
+ @Override
+ public void setup(OperatorContext context)
+ {
+ try {
+ // Added to let the output operator insert data into hbase table before input operator can read it
+ Thread.sleep(1000);
+ } catch(InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ super.setup(context);
+ }
+ }
+
private static final Logger logger = LoggerFactory.getLogger( HBasePOJOInputOperatorTest.class );
private final int TUPLE_NUM = 1000;
+ private final long RUN_DURATION = 30000; // time in ms
private HBaseStore store;
private HBasePOJOPutOperator hbaseOutputOperator;
- private HBasePOJOInputOperator hbaseInputOperator;
+ private TestHBasePOJOInputOperator hbaseInputOperator;
@Before
public void prepare() throws Exception
{
- hbaseInputOperator = new HBasePOJOInputOperator();
+ hbaseInputOperator = new TestHBasePOJOInputOperator();
hbaseOutputOperator = new HBasePOJOPutOperator();
setupOperators();
HBaseUtil.createTable( store.getConfiguration(), store.getTableName());
@@ -104,6 +123,7 @@ public class HBasePOJOInputOperatorTest
hbaseOutputOperator = dag.addOperator( OPERATOR.HBASEOUTPUT.name(), hbaseOutputOperator );
hbaseInputOperator = dag.addOperator(OPERATOR.HBASEINPUT.name(), hbaseInputOperator);
+ dag.setOutputPortAttribute(hbaseInputOperator.outputPort, Context.PortContext.TUPLE_CLASS, TestPOJO.class);
TupleCacheOutputOperator output = dag.addOperator(OPERATOR.OUTPUT.name(), TupleCacheOutputOperator.class);
@@ -120,6 +140,7 @@ public class HBasePOJOInputOperatorTest
final LocalMode.Controller lc = lma.getController();
lc.runAsync();
+ long start = System.currentTimeMillis();
//generator.doneLatch.await();
while(true)
{
@@ -128,10 +149,14 @@ public class HBasePOJOInputOperatorTest
Thread.sleep(1000);
}
catch( Exception e ){}
-
+ logger.info("Tuple row key: ", output.getReceivedTuples());
logger.info( "Received tuple number {}, instance is {}.", output.getReceivedTuples() == null ? 0 : output.getReceivedTuples().size(), System.identityHashCode( output ) );
- if( output.getReceivedTuples() != null && output.getReceivedTuples().size() == TUPLE_NUM )
+ if( output.getReceivedTuples() != null && output.getReceivedTuples().size() == TUPLE_NUM ) {
break;
+ }
+ if(System.currentTimeMillis() - start > RUN_DURATION) {
+ throw new RuntimeException("Testcase taking too long");
+ }
}
lc.shutdown();
@@ -173,8 +198,6 @@ public class HBasePOJOInputOperatorTest
hbaseInputOperator.setStore(store);
hbaseOutputOperator.setStore(store);
- hbaseInputOperator.setPojoTypeName( TestPOJO.class.getName() );
-
OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(
OPERATOR_ID, new AttributeMap.DefaultAttributeMap());
hbaseInputOperator.setup(context);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseScanOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseScanOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseScanOperatorTest.java
index e190841..254ef37 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseScanOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseScanOperatorTest.java
@@ -57,9 +57,9 @@ public class HBaseScanOperatorTest
HBaseTupleCollector tc = dag.addOperator("tuplecollector", HBaseTupleCollector.class);
dag.addStream("ss", thop.outputPort, tc.inputPort);
- thop.setTableName("table1");
- thop.setZookeeperQuorum("127.0.0.1");
- thop.setZookeeperClientPort(2181);
+ thop.getStore().setTableName("table1");
+ thop.getStore().setZookeeperQuorum("127.0.0.1");
+ thop.getStore().setZookeeperClientPort(2181);
LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(false);