You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2016/07/13 10:08:58 UTC

apex-malhar git commit: APEXMALHAR-1957: Added threading for reading data from hbase. Added support for progressive read. Added unit tests.

Repository: apex-malhar
Updated Branches:
  refs/heads/master 67b84dda4 -> 0b66f19d1


APEXMALHAR-1957: Added threading for reading data from hbase. Added support for progressive read. Added unit tests.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0b66f19d
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0b66f19d
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0b66f19d

Branch: refs/heads/master
Commit: 0b66f19d14a518b83059c74573dc7fdd58693788
Parents: 67b84dd
Author: bhupesh <bh...@gmail.com>
Authored: Thu Dec 24 18:50:25 2015 +0530
Committer: bhupesh <bh...@gmail.com>
Committed: Wed Jul 13 11:53:06 2016 +0530

----------------------------------------------------------------------
 .../contrib/hbase/HBaseFieldValueGenerator.java |  59 ++++++
 .../contrib/hbase/HBaseGetOperator.java         |   2 +-
 .../contrib/hbase/HBaseInputOperator.java       |  39 +---
 .../contrib/hbase/HBasePOJOInputOperator.java   | 156 ++++++++--------
 .../contrib/hbase/HBaseScanOperator.java        | 181 ++++++++++++++++++-
 .../contrib/hbase/HBaseGetOperatorTest.java     |   8 +-
 .../hbase/HBasePOJOInputOperatorTest.java       |  37 +++-
 .../contrib/hbase/HBaseScanOperatorTest.java    |   6 +-
 8 files changed, 351 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldValueGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldValueGenerator.java b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldValueGenerator.java
new file mode 100644
index 0000000..52b6f4b
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldValueGenerator.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.hbase;
+
+import java.util.List;
+
+import com.datatorrent.lib.util.FieldValueGenerator;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * A {@link FieldValueGenerator} implementation for {@link HBaseFieldInfo}
+ */
+public class HBaseFieldValueGenerator extends FieldValueGenerator<HBaseFieldInfo>
+{
+  public static final String COLON = ":";
+
+  @SuppressWarnings("unchecked")
+  protected HBaseFieldValueGenerator(final Class<?> clazz, List<HBaseFieldInfo> fieldInfos)
+  {
+    for (HBaseFieldInfo fieldInfo : fieldInfos) {
+      fieldInfoMap.put(fieldInfo.getFamilyName() + COLON + fieldInfo.getColumnName(), fieldInfo);
+
+      PojoUtils.Getter<Object, Object> getter =
+          PojoUtils.createGetter(clazz, fieldInfo.getPojoFieldExpression(), fieldInfo.getType().getJavaType());
+      fieldGetterMap.put(fieldInfo, getter);
+    }
+
+    for (HBaseFieldInfo fieldInfo : fieldInfos) {
+      PojoUtils.Setter<Object, Object> setter =
+          PojoUtils.createSetter(clazz, fieldInfo.getPojoFieldExpression(), fieldInfo.getType().getJavaType());
+      fieldSetterMap.put(fieldInfo, setter);
+    }
+  }
+
+  public void setColumnValue(Object instance, String columnName, String columnFamily, Object value,
+      ValueConverter<HBaseFieldInfo> valueConverter)
+  {
+    HBaseFieldInfo fieldInfo = fieldInfoMap.get(columnFamily + COLON + columnName);
+    PojoUtils.Setter<Object, Object> setter = fieldSetterMap.get(fieldInfo);
+    setter.set(instance, valueConverter == null ? value : valueConverter.convertValue(fieldInfo, value));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseGetOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseGetOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseGetOperator.java
index 0f583e0..37270d5 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseGetOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseGetOperator.java
@@ -47,7 +47,7 @@ public abstract class HBaseGetOperator<T> extends HBaseInputOperator<T>
   {
     try {
       Get get = operationGet();
-      Result result = table.get(get);
+      Result result = getStore().getTable().get(get);
       KeyValue[] kvs = result.raw();
       //T t = getTuple(kvs);
       //T t = getTuple(result);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseInputOperator.java
index 121e703..6f11621 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseInputOperator.java
@@ -18,10 +18,8 @@
  */
 package com.datatorrent.contrib.hbase;
 
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
-import java.io.IOException;
+import com.datatorrent.lib.db.AbstractStoreInputOperator;
 
 /**
  * A base implementation of hbase input operator which derives from HBaseOperatorBase. <br>
@@ -33,39 +31,6 @@ import java.io.IOException;
  * @param <T> The tuple type
  * @since 0.3.2
  */
-public abstract class HBaseInputOperator<T> extends HBaseOperatorBase implements InputOperator
+public abstract class HBaseInputOperator<T> extends AbstractStoreInputOperator<T, HBaseStore> implements InputOperator
 {
-  /**
-   * Output port that emits tuples into the DAG.
-   */
-  public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<T>();
-
-  //protected abstract T getTuple(Result result);
-  //protected abstract T getTuple(KeyValue kv);
-
-  @Override
-  public void beginWindow(long windowId)
-  {
-  }
-
-  @Override
-  public void endWindow()
-  {
-  }
-
-  @Override
-  public void setup(OperatorContext context)
-  {
-    try{
-      setupConfiguration();
-    } catch (IOException ie) {
-      throw new RuntimeException(ie);
-    }
-  }
-
-  @Override
-  public void teardown()
-  {
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java
index 4182e84..e459ec7 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java
@@ -18,14 +18,14 @@
  */
 package com.datatorrent.contrib.hbase;
 
-import java.io.IOException;
 import java.util.List;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -34,27 +34,30 @@ import com.datatorrent.lib.util.FieldValueGenerator.ValueConverter;
 import com.datatorrent.lib.util.PojoUtils;
 import com.datatorrent.lib.util.PojoUtils.Setter;
 import com.datatorrent.lib.util.TableInfo;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.Context.OperatorContext;
 
 /**
+ * HBasePOJOInputOperator reads data from a HBase store, converts it to a POJO and puts it on the output port.
+ * The read from HBase is asynchronous.
  * @displayName HBase Input Operator
  * @category Input
  * @tags database, nosql, pojo, hbase
  * @since 3.1.0
  */
 @Evolving
-public class HBasePOJOInputOperator extends HBaseInputOperator<Object>
+public class HBasePOJOInputOperator extends HBaseScanOperator<Object>
 {
   private TableInfo<HBaseFieldInfo> tableInfo;
-  protected HBaseStore store;
   private String pojoTypeName;
-  private String startRow;
-  private String lastReadRow;
 
-  protected transient Class pojoType;
-  private transient Setter<Object, String> rowSetter;
+  // Transients
+  protected transient Class<?> pojoType;
   protected transient FieldValueGenerator<HBaseFieldInfo> fieldValueGenerator;
   protected transient BytesValueConverter valueConverter;
+  private transient Scan scan;
+  private transient Setter<Object, String> rowSetter;
 
   public static class BytesValueConverter implements ValueConverter<HBaseFieldInfo>
   {
@@ -65,120 +68,121 @@ public class HBasePOJOInputOperator extends HBaseInputOperator<Object>
     }
   }
 
-  @Override
-  public void setup(OperatorContext context)
+  public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>()
   {
-    try {
-      store.connect();
-      pojoType = Class.forName(pojoTypeName);
-      pojoType.newInstance();   //try create new instance to verify the class.
-      rowSetter = PojoUtils.createSetter(pojoType, tableInfo.getRowOrIdExpression(), String.class);
-      fieldValueGenerator = FieldValueGenerator.getFieldValueGenerator(pojoType, tableInfo.getFieldsInfo() );
-      valueConverter = new BytesValueConverter();
-    } catch (Exception ex) {
-      throw new RuntimeException(ex);
+    public void setup(com.datatorrent.api.Context.PortContext context)
+    {
+      pojoType = context.getAttributes().get(Context.PortContext.TUPLE_CLASS);
     }
-  }
-
-  @Override
-  public void beginWindow(long windowId)
-  {
-  }
+  };
 
   @Override
-  public void teardown()
+  public void activate(Context context)
   {
     try {
-      store.disconnect();
-    } catch (IOException ex) {
+      pojoType.newInstance(); // try create new instance to verify the class.
+      rowSetter = PojoUtils.createSetter(pojoType, tableInfo.getRowOrIdExpression(), String.class);
+      fieldValueGenerator = new HBaseFieldValueGenerator(pojoType, tableInfo.getFieldsInfo());
+      valueConverter = new BytesValueConverter();
+      scan = new Scan();
+      super.activate(context);
+    } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
   }
 
   @Override
-  public void emitTuples()
+  protected Object getTuple(Result result)
   {
     try {
-      Scan scan = nextScan();
-      if (scan == null)
-        return;
-
-      ResultScanner resultScanner = store.getTable().getScanner(scan);
-
-      while (true) {
-        Result result = resultScanner.next();
-        if (result == null)
-          break;
-
-        String readRow = Bytes.toString(result.getRow());
-        if( readRow.equals( lastReadRow ))
-          continue;
-
-        Object instance = pojoType.newInstance();
-        rowSetter.set(instance, readRow);
-
-        List<Cell> cells = result.listCells();
+      String readRow = Bytes.toString(result.getRow());
+      if( readRow.equals( getLastReadRow() )) {
+        return null;
+      }
 
-        for (Cell cell : cells) {
-          String columnName = Bytes.toString(CellUtil.cloneQualifier(cell));
-          byte[] value = CellUtil.cloneValue(cell);
-          fieldValueGenerator.setColumnValue( instance, columnName, value, valueConverter );
-        }
+      Object instance = pojoType.newInstance();
+      rowSetter.set(instance, readRow);
 
-        outputPort.emit(instance);
-        lastReadRow = readRow;
+       List<Cell> cells = result.listCells();
+       for (Cell cell : cells) {
+         String columnName = Bytes.toString(CellUtil.cloneQualifier(cell));
+         String columnFamily = Bytes.toString(CellUtil.cloneFamily(cell));
+        byte[] value = CellUtil.cloneValue(cell);
+         ((HBaseFieldValueGenerator)fieldValueGenerator).setColumnValue(instance, columnName, columnFamily, value,
+             valueConverter);
       }
 
+      setLastReadRow(readRow);
+      return instance;
     } catch (Exception e) {
-      throw new RuntimeException(e.getMessage());
+      throw new RuntimeException(e);
     }
-
   }
 
-  protected Scan nextScan()
+  @Override
+  protected Scan operationScan()
   {
-    if(lastReadRow==null && startRow==null )
-      return new Scan();
-    else
-      return new Scan( Bytes.toBytes( lastReadRow == null ? startRow : lastReadRow ) );
+    if (getLastReadRow() == null && getStartRow() == null) {
+      // If no start row specified and no row read yet
+      if (scan == null) {
+        scan = new Scan();
+      }
+    } else if (getEndRow() == null) {
+      // If only start row specified
+      scan.setStartRow(Bytes.toBytes(getLastReadRow() == null ? getStartRow() : getLastReadRow()));
+    } else {
+      // If end row also specified
+      scan.setStartRow(Bytes.toBytes(getLastReadRow() == null ? getStartRow() : getLastReadRow()));
+      scan.setStopRow(Bytes.toBytes(getEndRow()));
+    }
+    for (HBaseFieldInfo field : tableInfo.getFieldsInfo()) {
+      scan.addColumn(Bytes.toBytes(field.getFamilyName()), Bytes.toBytes(field.getColumnName()));
+    }
+    return scan;
   }
 
-  public HBaseStore getStore()
-  {
-    return store;
-  }
-  public void setStore(HBaseStore store)
+  @Override
+  protected void emitTuple(Object tuple)
   {
-    this.store = store;
+    outputPort.emit(tuple);
   }
 
+  /**
+   * Returns the {@link #tableInfo} object as configured
+   * @return {@link #tableInfo}
+   */
   public TableInfo<HBaseFieldInfo> getTableInfo()
   {
     return tableInfo;
   }
 
+  /**
+   * Sets the {@link #tableInfo} object
+   * @param tableInfo
+   */
   public void setTableInfo(TableInfo<HBaseFieldInfo> tableInfo)
   {
     this.tableInfo = tableInfo;
   }
 
+  /**
+   * Returns the POJO class name
+   * @return {@link #pojoTypeName}
+   */
   public String getPojoTypeName()
   {
     return pojoTypeName;
   }
 
+  /**
+   * Sets the POJO class name
+   * @param pojoTypeName
+   */
   public void setPojoTypeName(String pojoTypeName)
   {
     this.pojoTypeName = pojoTypeName;
   }
 
-  public String getStartRow()
-  {
-    return startRow;
-  }
+  private static final Logger logger = LoggerFactory.getLogger(HBasePOJOInputOperator.class);
 
-  public void setStartRow(String startRow)
-  {
-    this.startRow = startRow;
-  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java
index 1b0d657..b694e67 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java
@@ -18,8 +18,19 @@
  */
 package com.datatorrent.contrib.hbase;
 
+import java.io.IOException;
+import java.util.Queue;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Context.OperatorContext;
+import com.google.common.collect.Queues;
+
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -40,27 +51,121 @@ import org.apache.hadoop.hbase.client.ResultScanner;
  * @tags hbase, scan, input operator
  * @since 0.3.2
  */
-public abstract class HBaseScanOperator<T> extends HBaseInputOperator<T>
+public abstract class HBaseScanOperator<T> extends HBaseInputOperator<T> implements Operator.ActivationListener<Context>
 {
+  public static final int DEF_QUEUE_SIZE = 1000;
+  public static final int DEF_SLEEP_MILLIS = 10;
+
+  private String startRow;
+  private String endRow;
+  private String lastReadRow;
+  private int queueSize = DEF_QUEUE_SIZE;
+  private int sleepMillis = DEF_SLEEP_MILLIS;
+  private Queue<Result> resultQueue;
+  private volatile boolean running;
+
+  @AutoMetric
+  protected long tuplesRead;
+
+  // Transients
+  protected transient Scan scan;
+  protected transient ResultScanner scanner;
+  protected transient Thread readThread;
+  private transient String threadFailureReason = null;
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    super.setup(context);
+    resultQueue = Queues.newLinkedBlockingQueue(queueSize);
+  }
+
+  @Override
+  public void activate(Context context)
+  {
+    startReadThread();
+  }
+
+  protected void startReadThread()
+  {
+    try {
+      scan = operationScan();
+      scanner = getStore().getTable().getScanner(scan);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    readThread = new Thread(new Runnable()
+    {
+      @Override
+      public void run()
+      {
+        try {
+          Result result;
+          while (true) {
+            if ((result = scanner.next()) != null) {
+              while ( running && !resultQueue.offer(result)) {
+                Thread.sleep(sleepMillis);
+              }
+            }
+          }
+        } catch (Exception e) {
+          logger.debug("Exception in fetching results {}", e.getMessage());
+          threadFailureReason = e.getMessage();
+        } finally {
+          scanner.close();
+        }
+      }
+    });
+    readThread.start();
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    tuplesRead = 0;
+    running = true;
+  }
 
   @Override
   public void emitTuples()
   {
+    if (!readThread.isAlive()) {
+      throw new RuntimeException(threadFailureReason);
+    }
     try {
-      HTable table = getTable();
-      Scan scan = operationScan();
-      ResultScanner scanner = table.getScanner(scan);
-      for (Result result : scanner) {
-        //KeyValue[] kvs = result.raw();
-        //T t = getTuple(kvs);
-        T t = getTuple(result);
-        outputPort.emit(t);
+      Result result = resultQueue.poll();
+      if (result == null) {
+        return;
+      }
+      T tuple = getTuple(result);
+      if (tuple != null) {
+        emitTuple(tuple);
+        tuplesRead++;
       }
     } catch (Exception e) {
-      e.printStackTrace();
+      throw new RuntimeException(e);
     }
   }
 
+  protected void emitTuple(T tuple)
+  {
+    outputPort.emit(tuple);
+  }
+
+  @Override
+  public void endWindow()
+  {
+    running = false;
+    super.endWindow();
+  }
+
+  @Override
+  public void deactivate()
+  {
+    readThread.interrupt();
+  }
+
   /**
    * Return a HBase Scan metric to retrieve the tuple.
    * The implementor should return a HBase Scan metric that specifies where to retrieve the tuple from
@@ -79,4 +184,60 @@ public abstract class HBaseScanOperator<T> extends HBaseInputOperator<T>
    */
   protected abstract T getTuple(Result result);
 
+  /**
+   * Returns the start row key in the table as set previously
+   * @return {@link #startRow}
+   */
+  public String getStartRow()
+  {
+    return startRow;
+  }
+
+  /**
+   * Sets the start row key in the table from where the scan should begin
+   * @param startRow
+   */
+  public void setStartRow(String startRow)
+  {
+    this.startRow = startRow;
+  }
+
+  /**
+   * Returns the end row key in the table as set previously
+   * @return {@link #endRow}
+   */
+  public String getEndRow()
+  {
+    return endRow;
+  }
+
+  /**
+   * Sets the end row key in the table where the scan should end
+   * @param endRow
+   */
+  public void setEndRow(String endRow)
+  {
+    this.endRow = endRow;
+  }
+
+  /**
+   * Returns the last read row key from the hbase table
+   * @return {@link #lastReadRow}
+   */
+  public String getLastReadRow()
+  {
+    return lastReadRow;
+  }
+
+  /**
+   * Sets the last read row key from the hbase table. After the failures, the new scan will start from this row key
+   * @param lastReadRow
+   */
+  public void setLastReadRow(String lastReadRow)
+  {
+    this.lastReadRow = lastReadRow;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(HBaseScanOperator.class);
+
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseGetOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseGetOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseGetOperatorTest.java
index 9c60507..44f2323 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseGetOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseGetOperatorTest.java
@@ -51,12 +51,14 @@ public class HBaseGetOperatorTest
 
       dag.setAttribute(DAG.APPLICATION_NAME, "HBaseGetOperatorTest");
       TestHBaseGetOperator thop = dag.addOperator("testhbaseget", TestHBaseGetOperator.class);
+      HBaseStore store = new HBaseStore();
+      thop.setStore(store);
       HBaseTupleCollector tc = dag.addOperator("tuplecollector", HBaseTupleCollector.class);
       dag.addStream("ss", thop.outputPort, tc.inputPort);
 
-      thop.setTableName("table1");
-      thop.setZookeeperQuorum("127.0.0.1");
-      thop.setZookeeperClientPort(2181);
+      thop.getStore().setTableName("table1");
+      thop.getStore().setZookeeperQuorum("127.0.0.1");
+      thop.getStore().setZookeeperClientPort(2181);
 
       LocalMode.Controller lc = lma.getController();
       lc.setHeartbeatMonitoringEnabled(false);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java
index d0b3e94..4e6bb39 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java
@@ -28,10 +28,13 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.internal.runners.statements.Fail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.datatorrent.api.Attribute.AttributeMap;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.LocalMode;
 import com.datatorrent.api.StreamingApplication;
@@ -59,17 +62,33 @@ public class HBasePOJOInputOperatorTest
       this.setTupleType( TestPOJO.class );
     }
   }
-  
+
+  public static class TestHBasePOJOInputOperator extends HBasePOJOInputOperator
+  {
+    @Override
+    public void setup(OperatorContext context)
+    {
+      try {
+        // Added to let the output operator insert data into hbase table before input operator can read it
+        Thread.sleep(1000);
+      } catch(InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      super.setup(context);
+    }
+  }
+
   private static final Logger logger = LoggerFactory.getLogger( HBasePOJOInputOperatorTest.class );
   private final int TUPLE_NUM = 1000;
+  private final long RUN_DURATION = 30000; // time in ms
   private HBaseStore store;
   private HBasePOJOPutOperator hbaseOutputOperator;
-  private HBasePOJOInputOperator hbaseInputOperator;
+  private TestHBasePOJOInputOperator hbaseInputOperator;
   
   @Before
   public void prepare() throws Exception
   {
-    hbaseInputOperator = new HBasePOJOInputOperator();
+    hbaseInputOperator = new TestHBasePOJOInputOperator();
     hbaseOutputOperator = new HBasePOJOPutOperator();
     setupOperators();
     HBaseUtil.createTable( store.getConfiguration(), store.getTableName());
@@ -104,6 +123,7 @@ public class HBasePOJOInputOperatorTest
     hbaseOutputOperator = dag.addOperator( OPERATOR.HBASEOUTPUT.name(), hbaseOutputOperator );
 
     hbaseInputOperator = dag.addOperator(OPERATOR.HBASEINPUT.name(), hbaseInputOperator);
+    dag.setOutputPortAttribute(hbaseInputOperator.outputPort, Context.PortContext.TUPLE_CLASS, TestPOJO.class);
     
     
     TupleCacheOutputOperator output = dag.addOperator(OPERATOR.OUTPUT.name(), TupleCacheOutputOperator.class);
@@ -120,6 +140,7 @@ public class HBasePOJOInputOperatorTest
     final LocalMode.Controller lc = lma.getController();
     lc.runAsync();
 
+    long start = System.currentTimeMillis();
     //generator.doneLatch.await();
     while(true)
     {
@@ -128,10 +149,14 @@ public class HBasePOJOInputOperatorTest
         Thread.sleep(1000);
       }
       catch( Exception e ){}
-      
+      logger.info("Tuple row key: ", output.getReceivedTuples());
       logger.info( "Received tuple number {}, instance is {}.", output.getReceivedTuples() == null ? 0 : output.getReceivedTuples().size(), System.identityHashCode( output ) );
-      if( output.getReceivedTuples() != null && output.getReceivedTuples().size() == TUPLE_NUM )
+      if( output.getReceivedTuples() != null && output.getReceivedTuples().size() == TUPLE_NUM ) {
         break;
+      }
+      if(System.currentTimeMillis() - start > RUN_DURATION) {
+        throw new RuntimeException("Testcase taking too long");
+      }
     }
     
     lc.shutdown();
@@ -173,8 +198,6 @@ public class HBasePOJOInputOperatorTest
     hbaseInputOperator.setStore(store);
     hbaseOutputOperator.setStore(store);
     
-    hbaseInputOperator.setPojoTypeName( TestPOJO.class.getName() );
-    
     OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(
         OPERATOR_ID, new AttributeMap.DefaultAttributeMap());
     hbaseInputOperator.setup(context);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseScanOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseScanOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseScanOperatorTest.java
index e190841..254ef37 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseScanOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseScanOperatorTest.java
@@ -57,9 +57,9 @@ public class HBaseScanOperatorTest
       HBaseTupleCollector tc = dag.addOperator("tuplecollector", HBaseTupleCollector.class);
       dag.addStream("ss", thop.outputPort, tc.inputPort);
 
-      thop.setTableName("table1");
-      thop.setZookeeperQuorum("127.0.0.1");
-      thop.setZookeeperClientPort(2181);
+      thop.getStore().setTableName("table1");
+      thop.getStore().setZookeeperQuorum("127.0.0.1");
+      thop.getStore().setZookeeperClientPort(2181);
 
       LocalMode.Controller lc = lma.getController();
       lc.setHeartbeatMonitoringEnabled(false);