You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:32:24 UTC

[36/50] [abbrv] incubator-apex-malhar git commit: MLHR-1748 #resolve Created concrete input and output operators for Redis Store Added test cases for the same.

MLHR-1748 #resolve Created concrete input and output operators for Redis Store
Added test cases for the same.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a57a3d75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a57a3d75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a57a3d75

Branch: refs/heads/master
Commit: a57a3d756bafc1269c827a71d4fea549abdf65dd
Parents: f40ba34
Author: ishark <is...@datatorrent.com>
Authored: Mon Jun 29 15:52:25 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Fri Aug 14 12:07:36 2015 -0700

----------------------------------------------------------------------
 contrib/pom.xml                                 |   8 +-
 .../redis/AbstractRedisInputOperator.java       | 224 +++++++++++++++++-
 .../redis/RedisKeyValueInputOperator.java       |  55 +++++
 .../redis/RedisMapAsValueInputOperator.java     |  45 ++++
 .../contrib/redis/RedisPOJOInputOperator.java   | 204 ++++++++++++++++
 .../contrib/redis/RedisPOJOOutputOperator.java  | 155 +++++++++++++
 .../datatorrent/contrib/redis/RedisStore.java   |  27 +++
 .../contrib/redis/RedisInputOperatorTest.java   | 193 ++++++++++++++++
 .../contrib/redis/RedisPOJOOperatorTest.java    | 230 +++++++++++++++++++
 demos/machinedata/pom.xml                       |   2 +-
 10 files changed, 1138 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 9776e2f..50d7234 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -181,6 +181,12 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.codehaus.janino</groupId>
+      <artifactId>janino</artifactId>
+      <version>2.7.8</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.10</artifactId>
       <version>0.8.1.1</version>
@@ -382,7 +388,7 @@
     <dependency>
       <groupId>redis.clients</groupId>
       <artifactId>jedis</artifactId>
-      <version>2.2.1</version>
+      <version>2.5.1</version>
       <optional>true</optional>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
index ff7a9a5..7f79bd0 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
@@ -15,11 +15,22 @@
  */
 package com.datatorrent.contrib.redis;
 
-import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import redis.clients.jedis.ScanParams;
+import redis.clients.jedis.ScanResult;
+import com.datatorrent.api.Operator.CheckpointListener;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.netlet.util.DTThrowable;
+import com.datatorrent.lib.db.AbstractStoreInputOperator;
+import com.datatorrent.lib.io.IdempotentStorageManager;
 
 /**
  * This is the base implementation of a Redis input operator.
- * <p></p>
+ * 
  * @displayName Abstract Redis Input
  * @category Input
  * @tags redis, key value
@@ -27,6 +38,213 @@ import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
  * @param <T> The tuple type.
  * @since 0.9.3
  */
-public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStoreInputOperator<T, RedisStore>
+public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOperator<T, RedisStore> implements CheckpointListener
 {
+  protected transient List<String> keys = new ArrayList<String>();
+  protected transient Integer scanOffset;
+  protected transient ScanParams scanParameters;
+  private transient boolean scanComplete;
+  private transient Integer backupOffset;
+  private int scanCount;
+  private transient boolean replay;
+
+  @NotNull
+  private IdempotentStorageManager idempotentStorageManager;
+
+  private transient OperatorContext context;
+  private transient long currentWindowId;
+  private transient Integer sleepTimeMillis;
+  private transient Integer scanCallsInCurrentWindow;
+  private RecoveryState recoveryState;
+
+  /*
+   * Recovery State contains last offset processed in window and number of times
+   * ScanKeys was invoked in window We need to capture to capture number of
+   * calls to ScanKeys because, last offset returned by scanKeys call is not
+   * always monotonically increasing. Storing offset and number of times scan
+   * was done for each window, guarantees idempotency for each window
+   */
+  public static class RecoveryState implements Serializable
+  {
+    public Integer scanOffsetAtBeginWindow, numberOfScanCallsInWindow;
+  }
+
+  public AbstractRedisInputOperator()
+  {
+    scanCount = 100;
+    recoveryState = new RecoveryState();
+    recoveryState.scanOffsetAtBeginWindow = 0;
+    recoveryState.numberOfScanCallsInWindow = 0;
+    setIdempotentStorageManager(new IdempotentStorageManager.NoopIdempotentStorageManager());
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    currentWindowId = windowId;
+    scanCallsInCurrentWindow = 0;
+    replay = false;
+    if (currentWindowId <= getIdempotentStorageManager().getLargestRecoveryWindow()) {
+      replay(windowId);
+    }
+  }
+
+  private void replay(long windowId)
+  {
+    try {
+      if (checkIfWindowExistsInIdempotencyManager(windowId - 1)) {
+        // Begin offset for this window is recovery offset stored for the last
+        // window
+        RecoveryState recoveryStateForLastWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId - 1);
+        recoveryState.scanOffsetAtBeginWindow = recoveryStateForLastWindow.scanOffsetAtBeginWindow;
+      }
+
+      RecoveryState recoveryStateForCurrentWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId);
+      recoveryState.numberOfScanCallsInWindow = recoveryStateForCurrentWindow.numberOfScanCallsInWindow;
+      if (recoveryState.scanOffsetAtBeginWindow != null) {
+        scanOffset = recoveryState.scanOffsetAtBeginWindow;
+      }
+      replay = true;
+    } catch (IOException e) {
+      DTThrowable.rethrow(e);
+    }
+  }
+
+  private boolean checkIfWindowExistsInIdempotencyManager(long windowId) throws IOException
+  {
+    long[] windowsIds = getIdempotentStorageManager().getWindowIds(context.getId());
+    if(windowsIds.length == 0 || windowId < windowsIds[0] || windowId > windowsIds[windowsIds.length - 1]) {
+      return false;
+    }
+    return true ;
+  }
+
+  private void scanKeysFromOffset()
+  {
+    if (!scanComplete) {
+      if (replay && scanCallsInCurrentWindow >= recoveryState.numberOfScanCallsInWindow) {
+        try {
+          Thread.sleep(sleepTimeMillis);
+        } catch (InterruptedException e) {
+          DTThrowable.rethrow(e);
+        }
+        return;
+      }
+
+      ScanResult<String> result = store.ScanKeys(scanOffset, scanParameters);
+      backupOffset = scanOffset;
+      scanOffset = Integer.parseInt(result.getStringCursor());
+      if (scanOffset == 0) {
+        // Redis store returns 0 after all data is read
+        scanComplete = true;
+
+        // point scanOffset to the end in this case for reading any new tuples
+        scanOffset = backupOffset + result.getResult().size();
+      }
+      keys = result.getResult();
+    }
+    scanCallsInCurrentWindow++;
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    super.setup(context);
+    sleepTimeMillis = context.getValue(context.SPIN_MILLIS);
+    getIdempotentStorageManager().setup(context);
+    this.context = context;
+    scanOffset = 0;
+    scanComplete = false;
+    scanParameters = new ScanParams();
+    scanParameters.count(scanCount);
+    // For the 1st window after checkpoint, windowID - 1 would not have recovery
+    // offset stored in idempotentStorageManager
+    // But recoveryOffset is non-transient, so will be recovered with
+    // checkPointing
+    scanOffset = recoveryState.scanOffsetAtBeginWindow;
+  }
+
+  @Override
+  public void endWindow()
+  {
+    while (replay && scanCallsInCurrentWindow < recoveryState.numberOfScanCallsInWindow) {
+      // If less keys got scanned in this window, scan till recovery offset
+      scanKeysFromOffset();
+      processTuples();
+    }
+    super.endWindow();
+    recoveryState.scanOffsetAtBeginWindow = scanOffset;
+    recoveryState.numberOfScanCallsInWindow = scanCallsInCurrentWindow;
+
+    if (currentWindowId > getIdempotentStorageManager().getLargestRecoveryWindow()) {
+      try {
+        getIdempotentStorageManager().save(recoveryState, context.getId(), currentWindowId);
+      } catch (IOException e) {
+        DTThrowable.rethrow(e);
+      }
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+    super.teardown();
+    getIdempotentStorageManager().teardown();
+  }
+
+  /*
+   * get number of keys to read for each redis key scan
+   */
+  public int getScanCount()
+  {
+    return scanCount;
+  }
+
+  /*
+   * set number of keys to read for each redis key scan
+   */
+  public void setScanCount(int scanCount)
+  {
+    this.scanCount = scanCount;
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    scanKeysFromOffset();
+    processTuples();
+  }
+
+  abstract public void processTuples();
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+    try {
+      getIdempotentStorageManager().deleteUpTo(context.getId(), windowId);
+    } catch (IOException e) {
+      throw new RuntimeException("committing", e);
+    }
+  }
+
+  /*
+   * get Idempotent Storage manager instance
+   */
+  public IdempotentStorageManager getIdempotentStorageManager()
+  {
+    return idempotentStorageManager;
+  }
+
+  /*
+   * set Idempotent storage manager instance
+   */
+  public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager)
+  {
+    this.idempotentStorageManager = idempotentStorageManager;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
new file mode 100644
index 0000000..8f419bd
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
@@ -0,0 +1,55 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.contrib.redis;
+
+import java.util.ArrayList;
+import java.util.List;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * This is the an implementation of a Redis input operator for fetching
+ * Key-Value pair stored in Redis. It takes in keys to fetch and emits
+ * corresponding <Key, Value> Pair. Value data type is String in this case.
+ * 
+ * @displayName Redis Input Operator for Key Value pair
+ * @category Store
+ * @tags input operator, key value
+ *
+ */
+public class RedisKeyValueInputOperator extends AbstractRedisInputOperator<KeyValPair<String, String>>
+{
+  private List<Object> keysObjectList = new ArrayList<Object>();
+
+  @Override
+  public void processTuples()
+  {
+    keysObjectList = new ArrayList<Object>(keys);
+    if (keysObjectList.size() > 0) {
+
+      List<Object> allValues = store.getAll(keysObjectList);
+      for (int i = 0; i < allValues.size() && i < keys.size(); i++) {
+        if (allValues.get(i) == null) {
+          outputPort.emit(new KeyValPair<String, String>(keys.get(i), null));
+        } else {
+          outputPort.emit(new KeyValPair<String, String>(keys.get(i), allValues.get(i).toString()));
+        }
+      }
+      keys.clear();
+      keysObjectList.clear();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
new file mode 100644
index 0000000..66ef582
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.redis;
+
+import java.util.Map;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * This is the an implementation of a Redis input operator It takes in keys to
+ * fetch and emits Values stored as Maps in Redis i.e. when value datatype in
+ * Redis is HashMap 
+ * 
+ * @displayName Redis Input Operator for Map
+ * @category Store
+ * @tags input operator, key value
+ *
+ */
+
+public class RedisMapAsValueInputOperator extends AbstractRedisInputOperator<KeyValPair<String, Map<String, String>>>
+{
+  @Override
+  public void processTuples()
+  {
+    for (String key : keys) {
+      if (store.getType(key).equals("hash")) {
+        Map<String, String> mapValue = store.getMap(key);
+        outputPort.emit(new KeyValPair<String, Map<String, String>>(key, mapValue));
+      }
+    }
+    keys.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java
new file mode 100644
index 0000000..5a73e61
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java
@@ -0,0 +1,204 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.redis;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Setter;
+import com.datatorrent.lib.util.PojoUtils.SetterBoolean;
+import com.datatorrent.lib.util.PojoUtils.SetterDouble;
+import com.datatorrent.lib.util.PojoUtils.SetterFloat;
+import com.datatorrent.lib.util.PojoUtils.SetterInt;
+import com.datatorrent.lib.util.PojoUtils.SetterLong;
+import com.datatorrent.lib.util.PojoUtils.SetterShort;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * This is a Redis input operator, which scans all keys in Redis store It
+ * converts Value stored as map to Plain Old Java Object. It outputs
+ * KeyValuePair with POJO as value
+ * <p>
+ * This output adapter Reads from RedisStore stored as <Key, Map> It outputs a
+ * Key value pair <key, POJO> as tuples.
+ * </p>
+ *
+ * @displayName Redis POJO Input Operator
+ * @category Store
+ * @tags output operator, key value
+ *
+ */
+@Evolving
+public class RedisPOJOInputOperator extends AbstractRedisInputOperator<KeyValPair<String, Object>>
+{
+  protected final Map<String, Object> map = new HashMap<String, Object>();
+  private ArrayList<FieldInfo> dataColumns;
+  private transient ArrayList<Object> setters;
+  private boolean isFirstTuple = true;
+  private String outputClass;
+  private Class<?> objectClass;
+
+  public RedisPOJOInputOperator()
+  {
+    super();
+    setters = new ArrayList<Object>();
+  }
+
+  @SuppressWarnings("unchecked")
+  private Object convertMapToObject(Map<String, String> tuple)
+  {
+    try {
+      Object mappedObject = objectClass.newInstance();
+      for (int i = 0; i < dataColumns.size(); i++) {
+        final SupportType type = dataColumns.get(i).getType();
+        final String columnName = dataColumns.get(i).getColumnName();
+
+        if (i < setters.size()) {
+          String value = tuple.get(columnName);
+          switch (type) {
+            case STRING:
+              ((Setter<Object, String>) setters.get(i)).set(mappedObject, value);
+              break;
+            case BOOLEAN:
+              ((SetterBoolean) setters.get(i)).set(mappedObject, Boolean.parseBoolean(value));
+              break;
+            case SHORT:
+              ((SetterShort) setters.get(i)).set(mappedObject, Short.parseShort(value));
+              break;
+            case INTEGER:
+              ((SetterInt) setters.get(i)).set(mappedObject, Integer.parseInt(value));
+              break;
+            case LONG:
+              ((SetterLong) setters.get(i)).set(mappedObject, Long.parseLong(value));
+              break;
+            case FLOAT:
+              ((SetterFloat) setters.get(i)).set(mappedObject, Float.parseFloat(value));
+              break;
+            case DOUBLE:
+              ((SetterDouble) setters.get(i)).set(mappedObject, Double.parseDouble(value));
+              break;
+            default:
+              break;
+          }
+        }
+      }
+      return mappedObject;
+    } catch (Exception e) {
+      DTThrowable.wrapIfChecked(e);
+    }
+    return null;
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    super.setup(context);
+  }
+
+  public void processFirstTuple(Map<String, String> value) throws ClassNotFoundException
+  {
+    objectClass = Class.forName(getOutputClass());
+
+    final int size = dataColumns.size();
+    for (int i = 0; i < size; i++) {
+      final SupportType type = dataColumns.get(i).getType();
+      final String getterExpression = dataColumns.get(i).getPojoFieldExpression();
+      final Object setter;
+      switch (type) {
+        case STRING:
+          setter = PojoUtils.createSetter(objectClass, getterExpression, String.class);
+          break;
+        case BOOLEAN:
+          setter = PojoUtils.createSetterBoolean(objectClass, getterExpression);
+          break;
+        case SHORT:
+          setter = PojoUtils.createSetterShort(objectClass, getterExpression);
+          break;
+        case INTEGER:
+          setter = PojoUtils.createSetterInt(objectClass, getterExpression);
+          break;
+        case LONG:
+          setter = PojoUtils.createSetterLong(objectClass, getterExpression);
+          break;
+        case FLOAT:
+          setter = PojoUtils.createSetterFloat(objectClass, getterExpression);
+          break;
+        case DOUBLE:
+          setter = PojoUtils.createSetterDouble(objectClass, getterExpression);
+          break;
+        default:
+          setter = PojoUtils.createSetter(objectClass, getterExpression, Object.class);
+          break;
+      }
+      setters.add(setter);
+    }
+  }
+
+  @Override
+  public void processTuples()
+  {
+    for (String key : keys) {
+      if (store.getType(key).equals("hash")) {
+        Map<String, String> mapValue = store.getMap(key);
+        if (isFirstTuple) {
+          try {
+            processFirstTuple(mapValue);
+          } catch (ClassNotFoundException e) {
+            DTThrowable.rethrow(e);
+          }
+        }
+        isFirstTuple = false;
+        outputPort.emit(new KeyValPair<String, Object>(key, convertMapToObject(mapValue)));
+      }
+    }
+    keys.clear();
+  }
+
+  /*
+   * Output class type
+   */
+  public String getOutputClass()
+  {
+    return outputClass;
+  }
+
+  public void setOutputClass(String outputClass)
+  {
+    this.outputClass = outputClass;
+  }
+
+  /*
+   * An arraylist of data column names to be set in Redis store as a Map. Gets
+   * column names, column expressions and column data types
+   */
+  public ArrayList<FieldInfo> getDataColumns()
+  {
+    return dataColumns;
+  }
+
+  public void setDataColumns(ArrayList<FieldInfo> dataColumns)
+  {
+    this.dataColumns = dataColumns;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOOutputOperator.java
new file mode 100644
index 0000000..8966248
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOOutputOperator.java
@@ -0,0 +1,155 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.redis;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+
+/**
+ * This is a Redis output operator, which takes a Key and corresponding Plain
+ * Old Java Object as input. And writes a Map out to Redis based on Expressions
+ * provided.
+ * <p>
+ * This output adapter takes a Key value pair <key, POJO> as tuples and just
+ * writes to the redis store with the key and the value is a Map containing
+ * object attributes as <keys,value> Note: Redis output operator should never
+ * use the passthrough method because it begins a transaction at beginWindow and
+ * commits a transaction at endWindow, and a transaction in Redis blocks all
+ * other clients.
+ * </p>
+ *
+ * @displayName Redis POJO Output Operator
+ * @category Store
+ * @tags output operator, key value
+ *
+ */
+public class RedisPOJOOutputOperator extends AbstractRedisAggregateOutputOperator<KeyValPair<String, Object>>
+{
+  protected final Map<String, Object> map = new HashMap<String, Object>();
+  private ArrayList<FieldInfo> dataColumns;
+  private transient ArrayList<Object> getters;
+  private boolean isFirstTuple = true;
+
+  public RedisPOJOOutputOperator()
+  {
+    super();
+    getters = new ArrayList<Object>();
+  }
+
+  @Override
+  public void storeAggregate()
+  {
+    for (Entry<String, Object> entry : map.entrySet()) {
+
+      Map<String, String> mapObject = convertObjectToMap(entry.getValue());
+      store.put(entry.getKey(), mapObject);
+    }
+  }
+
+  private Map<String, String> convertObjectToMap(Object tuple)
+  {
+
+    Map<String, String> mappedObject = new HashMap<String, String>();
+    for (int i = 0; i < dataColumns.size(); i++) {
+      final SupportType type = dataColumns.get(i).getType();
+      final String columnName = dataColumns.get(i).getColumnName();
+
+      if (i < getters.size()) {
+        Getter<Object, Object> obj = (Getter<Object, Object>) (getters.get(i));
+
+        Object value = obj.get(tuple);
+        mappedObject.put(columnName, value.toString());
+      }
+    }
+
+    return mappedObject;
+  }
+
+  public void processFirstTuple(KeyValPair<String, Object> tuple)
+  {
+    // Create getters using first value entry in map
+    // Entry<String, Object> entry= tuple.entrySet().iterator().next();
+    Object value = tuple.getValue();
+
+    final Class<?> fqcn = value.getClass();
+    final int size = dataColumns.size();
+    for (int i = 0; i < size; i++) {
+      final SupportType type = dataColumns.get(i).getType();
+      final String getterExpression = dataColumns.get(i).getPojoFieldExpression();
+      final Object getter;
+      switch (type) {
+        case STRING:
+          getter = PojoUtils.createGetter(fqcn, getterExpression, String.class);
+          break;
+        case BOOLEAN:
+          getter = PojoUtils.createGetterBoolean(fqcn, getterExpression);
+          break;
+        case SHORT:
+          getter = PojoUtils.createGetterShort(fqcn, getterExpression);
+          break;
+        case INTEGER:
+          getter = PojoUtils.createGetter(fqcn, getterExpression, type.getJavaType());
+          break;
+        case LONG:
+          getter = PojoUtils.createGetterLong(fqcn, getterExpression);
+          break;
+        case FLOAT:
+          getter = PojoUtils.createGetterFloat(fqcn, getterExpression);
+          break;
+        case DOUBLE:
+          getter = PojoUtils.createGetterDouble(fqcn, getterExpression);
+          break;
+        default:
+          getter = PojoUtils.createGetter(fqcn, getterExpression, Object.class);
+          break;
+      }
+      getters.add(getter);
+    }
+  }
+
+  @Override
+  public void processTuple(KeyValPair<String, Object> tuple)
+  {
+    if (isFirstTuple) {
+      processFirstTuple(tuple);
+    }
+
+    isFirstTuple = false;
+    map.put(tuple.getKey(), tuple.getValue());
+  }
+
+  /*
+   * An arraylist of data column names to be set in Redis store as a Map. Gets
+   * column names, column expressions and column data types
+   */
+  public ArrayList<FieldInfo> getDataColumns()
+  {
+    return dataColumns;
+  }
+
+  public void setDataColumns(ArrayList<FieldInfo> dataColumns)
+  {
+    this.dataColumns = dataColumns;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java
index ea8e26b..2acc1d5 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java
@@ -23,6 +23,8 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.Jedis;
+import redis.clients.jedis.ScanParams;
+import redis.clients.jedis.ScanResult;
 import redis.clients.jedis.Transaction;
 
 import com.datatorrent.lib.db.TransactionableKeyValueStore;
@@ -181,6 +183,26 @@ public class RedisStore implements TransactionableKeyValueStore
     return jedis.get(key.toString());
   }
 
+  public String getType(String key)
+  {
+    return jedis.type(key);
+  }
+
+  /**
+   * Gets the stored Map for given the key, when the value data type is a map, stored with hmset  
+   *
+   * @param key
+   * @return hashmap stored for the key.
+   */
+  public Map<String, String> getMap(Object key)
+  {
+    if (isInTransaction()) {
+      throw new RuntimeException("Cannot call get when in redis transaction");
+    }
+    return jedis.hgetAll(key.toString());
+  }
+
+
   /**
    * Gets all the values given the keys.
    * Note that it does NOT work with hash values or list values
@@ -255,6 +277,11 @@ public class RedisStore implements TransactionableKeyValueStore
     }
   }
 
+  public ScanResult<String> ScanKeys(Integer offset, ScanParams params)
+  {
+    return jedis.scan(offset.toString(), params);
+  }
+
   /**
    * Calls hincrbyfloat on the redis store.
    *

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
new file mode 100644
index 0000000..08fb294
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
@@ -0,0 +1,193 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.contrib.redis;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import redis.clients.jedis.ScanParams;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KeyValPair;
+
+public class RedisInputOperatorTest
+{
+  private RedisStore operatorStore;
+  private RedisStore testStore;
+
+  public static class CollectorModule extends BaseOperator
+  {
+    volatile static List<KeyValPair<String, String>> resultMap = new ArrayList<KeyValPair<String, String>>();
+    static long resultCount = 0;
+
+    public final transient DefaultInputPort<KeyValPair<String, String>> inputPort = new DefaultInputPort<KeyValPair<String, String>>()
+    {
+      @Override
+      public void process(KeyValPair<String, String> tuple)
+      {
+        resultMap.add(tuple);
+        resultCount++;
+      }
+    };
+  }
+
+  @Test
+  public void testIntputOperator() throws IOException
+  {
+    this.operatorStore = new RedisStore();
+    this.testStore = new RedisStore();
+
+    testStore.connect();
+    ScanParams params = new ScanParams();
+    params.count(1);
+
+    testStore.put("test_abc", "789");
+    testStore.put("test_def", "456");
+    testStore.put("test_ghi", "123");
+
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      DAG dag = lma.getDAG();
+
+      RedisKeyValueInputOperator inputOperator = dag.addOperator("input", new RedisKeyValueInputOperator());
+      final CollectorModule collector = dag.addOperator("collector", new CollectorModule());
+
+      inputOperator.setStore(operatorStore);
+      dag.addStream("stream", inputOperator.outputPort, collector.inputPort);
+      final LocalMode.Controller lc = lma.getController();
+
+      new Thread("LocalClusterController")
+      {
+        @Override
+        public void run()
+        {
+          long startTms = System.currentTimeMillis();
+          long timeout = 50000L;
+          try {
+            Thread.sleep(1000);
+            while (System.currentTimeMillis() - startTms < timeout) {
+              if (CollectorModule.resultMap.size() < 3) {
+                Thread.sleep(10);
+              } else {
+                break;
+              }
+            }
+          } catch (InterruptedException ex) {
+          }
+          lc.shutdown();
+        }
+      }.start();
+
+      lc.run();
+
+      Assert.assertTrue(CollectorModule.resultMap.contains(new KeyValPair<String, String>("test_abc", "789")));
+      Assert.assertTrue(CollectorModule.resultMap.contains(new KeyValPair<String, String>("test_def", "456")));
+      Assert.assertTrue(CollectorModule.resultMap.contains(new KeyValPair<String, String>("test_ghi", "123")));
+    } finally {
+      for (KeyValPair<String, String> entry : CollectorModule.resultMap) {
+        testStore.remove(entry.getKey());
+      }
+      testStore.disconnect();
+    }
+  }
+
+  @Test
+  public void testRecoveryAndIdempotency() throws Exception
+  {
+    this.operatorStore = new RedisStore();
+    this.testStore = new RedisStore();
+
+    testStore.connect();
+    ScanParams params = new ScanParams();
+    params.count(1);
+
+    testStore.put("test_abc", "789");
+    testStore.put("test_def", "456");
+    testStore.put("test_ghi", "123");
+
+    RedisKeyValueInputOperator operator = new RedisKeyValueInputOperator();
+    operator.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+    
+    operator.setStore(operatorStore);
+    operator.setScanCount(1);
+    Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+    CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
+
+    operator.outputPort.setSink(sink);
+    OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap);
+
+    try {
+      operator.setup(context);
+      operator.beginWindow(1);
+      operator.emitTuples();
+      operator.endWindow();
+
+      int numberOfMessagesInWindow1 = sink.collectedTuples.size();
+      sink.collectedTuples.clear();
+
+      operator.beginWindow(2);
+      operator.emitTuples();
+      operator.endWindow();
+      int numberOfMessagesInWindow2 = sink.collectedTuples.size();
+      sink.collectedTuples.clear();
+
+      // failure and then re-deployment of operator
+      // Re-instantiating to reset values
+      operator = new RedisKeyValueInputOperator();
+      operator.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+      operator.setStore(operatorStore);
+      operator.setScanCount(1);
+      operator.outputPort.setSink(sink);
+      operator.setup(context);
+
+      Assert.assertEquals("largest recovery window", 2, operator.getIdempotentStorageManager().getLargestRecoveryWindow());
+
+      operator.beginWindow(1);
+      operator.emitTuples();
+      operator.emitTuples();
+      operator.endWindow();
+
+      Assert.assertEquals("num of messages in window 1", numberOfMessagesInWindow1, sink.collectedTuples.size());
+
+      sink.collectedTuples.clear();
+      operator.beginWindow(2);
+      operator.emitTuples();
+      operator.endWindow();
+      Assert.assertEquals("num of messages in window 2",numberOfMessagesInWindow2, sink.collectedTuples.size());
+    } finally {
+      for (Object e : sink.collectedTuples) {
+        KeyValPair<String, String> entry = (KeyValPair<String, String>) e;
+        testStore.remove(entry.getKey());
+      }
+      sink.collectedTuples.clear();
+      operator.getIdempotentStorageManager().deleteUpTo(context.getId(), 5);
+      operator.teardown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/test/java/com/datatorrent/contrib/redis/RedisPOJOOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisPOJOOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisPOJOOperatorTest.java
new file mode 100644
index 0000000..7792b5a
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisPOJOOperatorTest.java
@@ -0,0 +1,230 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.redis;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import redis.clients.jedis.ScanParams;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.contrib.redis.RedisInputOperatorTest.CollectorModule;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.datatorrent.lib.util.KeyValPair;
+
+public class RedisPOJOOperatorTest
+{
+  private RedisStore operatorStore;
+  private RedisStore testStore;
+
+  public static class TestClass
+  {
+    private Integer intValue;
+    private String stringValue;
+
+    public TestClass()
+    {
+    }
+
+    public TestClass(int v1, String v2)
+    {
+      intValue = v1;
+      stringValue = v2;
+    }
+
+    public Integer getIntValue()
+    {
+      return intValue;
+    }
+
+    public void setIntValue(int intValue)
+    {
+      this.intValue = intValue;
+    }
+
+    public String getStringValue()
+    {
+      return stringValue;
+    }
+
+    public void setStringValue(String stringValue)
+    {
+      this.stringValue = stringValue;
+    }
+  }
+
+  @Test
+  public void testOutputOperator() throws IOException
+  {
+    this.operatorStore = new RedisStore();
+
+    operatorStore.connect();
+    String appId = "test_appid";
+    int operatorId = 0;
+
+    operatorStore.removeCommittedWindowId(appId, operatorId);
+    operatorStore.disconnect();
+
+    RedisPOJOOutputOperator outputOperator = new RedisPOJOOutputOperator();
+
+    ArrayList<FieldInfo> fields = new ArrayList<FieldInfo>();
+
+    fields.add(new FieldInfo("column1", "intValue", SupportType.INTEGER));
+    fields.add(new FieldInfo("column2", "getStringValue()", SupportType.STRING));
+
+    outputOperator.setDataColumns(fields);
+
+    try {
+      com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributes = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+      attributes.put(DAG.APPLICATION_ID, appId);
+
+      outputOperator.setStore(operatorStore);
+      outputOperator.setup(new OperatorContextTestHelper.TestIdOperatorContext(operatorId, attributes));
+      outputOperator.beginWindow(101);
+
+      KeyValPair<String, Object> keyVal = new KeyValPair<String, Object>("test_abc1", new TestClass(1, "abc"));
+
+      outputOperator.input.process(keyVal);
+
+      outputOperator.endWindow();
+
+      outputOperator.teardown();
+
+      operatorStore.connect();
+
+      Map<String, String> out = operatorStore.getMap("test_abc1");
+      Assert.assertEquals("1", out.get("column1"));
+      Assert.assertEquals("abc", out.get("column2"));
+    } finally {
+      operatorStore.remove("test_abc1");
+      operatorStore.disconnect();
+    }
+  }
+
+  public static class ObjectCollectorModule extends BaseOperator
+  {
+    volatile static Map<String, Object> resultMap = new HashMap<String, Object>();
+    static long resultCount = 0;
+
+    public final transient DefaultInputPort<KeyValPair<String, Object>> inputPort = new DefaultInputPort<KeyValPair<String, Object>>()
+    {
+      @Override
+      public void process(KeyValPair<String, Object> tuple)
+      {
+        resultMap.put(tuple.getKey(), tuple.getValue());
+        resultCount++;
+      }
+    };
+  }
+
+  @Test
+  public void testInputOperator() throws IOException
+  {
+    @SuppressWarnings("unused")
+    Class<?> clazz = org.codehaus.janino.CompilerFactory.class;
+
+    this.operatorStore = new RedisStore();
+    this.testStore = new RedisStore();
+
+    testStore.connect();
+    ScanParams params = new ScanParams();
+    params.count(100);
+
+    Map<String, String> value = new HashMap<String, String>();
+    value.put("Column1", "abc");
+    value.put("Column2", "1");
+
+    Map<String, String> value1 = new HashMap<String, String>();
+    value1.put("Column1", "def");
+    value1.put("Column2", "2");
+
+    Map<String, String> value2 = new HashMap<String, String>();
+    value2.put("Column1", "ghi");
+    value2.put("Column2", "3");
+
+    testStore.put("test_abc_in", value);
+    testStore.put("test_def_in", value1);
+    testStore.put("test_ghi_in", value2);
+
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      DAG dag = lma.getDAG();
+
+      RedisPOJOInputOperator inputOperator = dag.addOperator("input", new RedisPOJOInputOperator());
+      final ObjectCollectorModule collector = dag.addOperator("collector", new ObjectCollectorModule());
+
+      ArrayList<FieldInfo> fields = new ArrayList<FieldInfo>();
+
+      fields.add(new FieldInfo("Column1", "stringValue", SupportType.STRING));
+      fields.add(new FieldInfo("Column2", "intValue", SupportType.INTEGER));
+
+      inputOperator.setDataColumns(fields);
+      inputOperator.setOutputClass(TestClass.class.getName());
+
+      inputOperator.setStore(operatorStore);
+      dag.addStream("stream", inputOperator.outputPort, collector.inputPort);
+      final LocalMode.Controller lc = lma.getController();
+
+      new Thread("LocalClusterController")
+      {
+        @Override
+        public void run()
+        {
+          long startTms = System.currentTimeMillis();
+          long timeout = 10000L;
+          try {
+            Thread.sleep(1000);
+            while (System.currentTimeMillis() - startTms < timeout) {
+              if (ObjectCollectorModule.resultMap.size() < 3) {
+                Thread.sleep(10);
+              } else {
+                break;
+              }
+            }
+          } catch (InterruptedException ex) {
+          }
+          lc.shutdown();
+        }
+      }.start();
+
+      lc.run();
+
+      Assert.assertTrue(ObjectCollectorModule.resultMap.containsKey("test_abc_in"));
+      Assert.assertTrue(ObjectCollectorModule.resultMap.containsKey("test_def_in"));
+      Assert.assertTrue(ObjectCollectorModule.resultMap.containsKey("test_ghi_in"));
+
+      TestClass a = (TestClass) ObjectCollectorModule.resultMap.get("test_abc_in");
+      Assert.assertNotNull(a);
+      Assert.assertEquals("abc", a.stringValue);
+      Assert.assertEquals("1", a.intValue.toString());
+    } finally {
+      for (KeyValPair<String, String> entry : CollectorModule.resultMap) {
+        testStore.remove(entry.getKey());
+      }
+      testStore.disconnect();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/demos/machinedata/pom.xml
----------------------------------------------------------------------
diff --git a/demos/machinedata/pom.xml b/demos/machinedata/pom.xml
index 1f3f075..3498d0d 100644
--- a/demos/machinedata/pom.xml
+++ b/demos/machinedata/pom.xml
@@ -31,7 +31,7 @@
     <dependency>
       <groupId>redis.clients</groupId>
       <artifactId>jedis</artifactId>
-      <version>2.2.1</version>
+      <version>2.5.1</version>
     </dependency>
     <dependency>
       <groupId>javax.mail</groupId>