You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:32:24 UTC
[36/50] [abbrv] incubator-apex-malhar git commit: MLHR-1748 #resolve
Created concrete input and output operators for Redis Store Added test cases
for the same.
MLHR-1748 #resolve Created concrete input and output operators for Redis Store
Added test cases for the same.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a57a3d75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a57a3d75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a57a3d75
Branch: refs/heads/master
Commit: a57a3d756bafc1269c827a71d4fea549abdf65dd
Parents: f40ba34
Author: ishark <is...@datatorrent.com>
Authored: Mon Jun 29 15:52:25 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Fri Aug 14 12:07:36 2015 -0700
----------------------------------------------------------------------
contrib/pom.xml | 8 +-
.../redis/AbstractRedisInputOperator.java | 224 +++++++++++++++++-
.../redis/RedisKeyValueInputOperator.java | 55 +++++
.../redis/RedisMapAsValueInputOperator.java | 45 ++++
.../contrib/redis/RedisPOJOInputOperator.java | 204 ++++++++++++++++
.../contrib/redis/RedisPOJOOutputOperator.java | 155 +++++++++++++
.../datatorrent/contrib/redis/RedisStore.java | 27 +++
.../contrib/redis/RedisInputOperatorTest.java | 193 ++++++++++++++++
.../contrib/redis/RedisPOJOOperatorTest.java | 230 +++++++++++++++++++
demos/machinedata/pom.xml | 2 +-
10 files changed, 1138 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 9776e2f..50d7234 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -181,6 +181,12 @@
<dependencies>
<dependency>
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>janino</artifactId>
+ <version>2.7.8</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
@@ -382,7 +388,7 @@
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
- <version>2.2.1</version>
+ <version>2.5.1</version>
<optional>true</optional>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
index ff7a9a5..7f79bd0 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
@@ -15,11 +15,22 @@
*/
package com.datatorrent.contrib.redis;
-import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import redis.clients.jedis.ScanParams;
+import redis.clients.jedis.ScanResult;
+import com.datatorrent.api.Operator.CheckpointListener;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.netlet.util.DTThrowable;
+import com.datatorrent.lib.db.AbstractStoreInputOperator;
+import com.datatorrent.lib.io.IdempotentStorageManager;
/**
* This is the base implementation of a Redis input operator.
- * <p></p>
+ *
* @displayName Abstract Redis Input
* @category Input
* @tags redis, key value
@@ -27,6 +38,213 @@ import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
* @param <T> The tuple type.
* @since 0.9.3
*/
-public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStoreInputOperator<T, RedisStore>
+public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOperator<T, RedisStore> implements CheckpointListener
{
+ protected transient List<String> keys = new ArrayList<String>();
+ protected transient Integer scanOffset;
+ protected transient ScanParams scanParameters;
+ private transient boolean scanComplete;
+ private transient Integer backupOffset;
+ private int scanCount;
+ private transient boolean replay;
+
+ @NotNull
+ private IdempotentStorageManager idempotentStorageManager;
+
+ private transient OperatorContext context;
+ private transient long currentWindowId;
+ private transient Integer sleepTimeMillis;
+ private transient Integer scanCallsInCurrentWindow;
+ private RecoveryState recoveryState;
+
+ /*
+ * Recovery State contains last offset processed in window and number of times
+ * ScanKeys was invoked in window We need to capture to capture number of
+ * calls to ScanKeys because, last offset returned by scanKeys call is not
+ * always monotonically increasing. Storing offset and number of times scan
+ * was done for each window, guarantees idempotency for each window
+ */
+ public static class RecoveryState implements Serializable
+ {
+ public Integer scanOffsetAtBeginWindow, numberOfScanCallsInWindow;
+ }
+
+ public AbstractRedisInputOperator()
+ {
+ scanCount = 100;
+ recoveryState = new RecoveryState();
+ recoveryState.scanOffsetAtBeginWindow = 0;
+ recoveryState.numberOfScanCallsInWindow = 0;
+ setIdempotentStorageManager(new IdempotentStorageManager.NoopIdempotentStorageManager());
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ currentWindowId = windowId;
+ scanCallsInCurrentWindow = 0;
+ replay = false;
+ if (currentWindowId <= getIdempotentStorageManager().getLargestRecoveryWindow()) {
+ replay(windowId);
+ }
+ }
+
+ private void replay(long windowId)
+ {
+ try {
+ if (checkIfWindowExistsInIdempotencyManager(windowId - 1)) {
+ // Begin offset for this window is recovery offset stored for the last
+ // window
+ RecoveryState recoveryStateForLastWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId - 1);
+ recoveryState.scanOffsetAtBeginWindow = recoveryStateForLastWindow.scanOffsetAtBeginWindow;
+ }
+
+ RecoveryState recoveryStateForCurrentWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId);
+ recoveryState.numberOfScanCallsInWindow = recoveryStateForCurrentWindow.numberOfScanCallsInWindow;
+ if (recoveryState.scanOffsetAtBeginWindow != null) {
+ scanOffset = recoveryState.scanOffsetAtBeginWindow;
+ }
+ replay = true;
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+
+ private boolean checkIfWindowExistsInIdempotencyManager(long windowId) throws IOException
+ {
+ long[] windowsIds = getIdempotentStorageManager().getWindowIds(context.getId());
+ if(windowsIds.length == 0 || windowId < windowsIds[0] || windowId > windowsIds[windowsIds.length - 1]) {
+ return false;
+ }
+ return true ;
+ }
+
+ private void scanKeysFromOffset()
+ {
+ if (!scanComplete) {
+ if (replay && scanCallsInCurrentWindow >= recoveryState.numberOfScanCallsInWindow) {
+ try {
+ Thread.sleep(sleepTimeMillis);
+ } catch (InterruptedException e) {
+ DTThrowable.rethrow(e);
+ }
+ return;
+ }
+
+ ScanResult<String> result = store.ScanKeys(scanOffset, scanParameters);
+ backupOffset = scanOffset;
+ scanOffset = Integer.parseInt(result.getStringCursor());
+ if (scanOffset == 0) {
+ // Redis store returns 0 after all data is read
+ scanComplete = true;
+
+ // point scanOffset to the end in this case for reading any new tuples
+ scanOffset = backupOffset + result.getResult().size();
+ }
+ keys = result.getResult();
+ }
+ scanCallsInCurrentWindow++;
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ sleepTimeMillis = context.getValue(context.SPIN_MILLIS);
+ getIdempotentStorageManager().setup(context);
+ this.context = context;
+ scanOffset = 0;
+ scanComplete = false;
+ scanParameters = new ScanParams();
+ scanParameters.count(scanCount);
+ // For the 1st window after checkpoint, windowID - 1 would not have recovery
+ // offset stored in idempotentStorageManager
+ // But recoveryOffset is non-transient, so will be recovered with
+ // checkPointing
+ scanOffset = recoveryState.scanOffsetAtBeginWindow;
+ }
+
+ @Override
+ public void endWindow()
+ {
+ while (replay && scanCallsInCurrentWindow < recoveryState.numberOfScanCallsInWindow) {
+ // If less keys got scanned in this window, scan till recovery offset
+ scanKeysFromOffset();
+ processTuples();
+ }
+ super.endWindow();
+ recoveryState.scanOffsetAtBeginWindow = scanOffset;
+ recoveryState.numberOfScanCallsInWindow = scanCallsInCurrentWindow;
+
+ if (currentWindowId > getIdempotentStorageManager().getLargestRecoveryWindow()) {
+ try {
+ getIdempotentStorageManager().save(recoveryState, context.getId(), currentWindowId);
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ super.teardown();
+ getIdempotentStorageManager().teardown();
+ }
+
+ /*
+ * get number of keys to read for each redis key scan
+ */
+ public int getScanCount()
+ {
+ return scanCount;
+ }
+
+ /*
+ * set number of keys to read for each redis key scan
+ */
+ public void setScanCount(int scanCount)
+ {
+ this.scanCount = scanCount;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ scanKeysFromOffset();
+ processTuples();
+ }
+
+ abstract public void processTuples();
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ try {
+ getIdempotentStorageManager().deleteUpTo(context.getId(), windowId);
+ } catch (IOException e) {
+ throw new RuntimeException("committing", e);
+ }
+ }
+
+ /*
+ * get Idempotent Storage manager instance
+ */
+ public IdempotentStorageManager getIdempotentStorageManager()
+ {
+ return idempotentStorageManager;
+ }
+
+ /*
+ * set Idempotent storage manager instance
+ */
+ public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager)
+ {
+ this.idempotentStorageManager = idempotentStorageManager;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
new file mode 100644
index 0000000..8f419bd
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
@@ -0,0 +1,55 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.contrib.redis;
+
+import java.util.ArrayList;
+import java.util.List;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * This is the an implementation of a Redis input operator for fetching
+ * Key-Value pair stored in Redis. It takes in keys to fetch and emits
+ * corresponding <Key, Value> Pair. Value data type is String in this case.
+ *
+ * @displayName Redis Input Operator for Key Value pair
+ * @category Store
+ * @tags input operator, key value
+ *
+ */
+public class RedisKeyValueInputOperator extends AbstractRedisInputOperator<KeyValPair<String, String>>
+{
+ private List<Object> keysObjectList = new ArrayList<Object>();
+
+ @Override
+ public void processTuples()
+ {
+ keysObjectList = new ArrayList<Object>(keys);
+ if (keysObjectList.size() > 0) {
+
+ List<Object> allValues = store.getAll(keysObjectList);
+ for (int i = 0; i < allValues.size() && i < keys.size(); i++) {
+ if (allValues.get(i) == null) {
+ outputPort.emit(new KeyValPair<String, String>(keys.get(i), null));
+ } else {
+ outputPort.emit(new KeyValPair<String, String>(keys.get(i), allValues.get(i).toString()));
+ }
+ }
+ keys.clear();
+ keysObjectList.clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
new file mode 100644
index 0000000..66ef582
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.redis;
+
+import java.util.Map;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * This is the an implementation of a Redis input operator It takes in keys to
+ * fetch and emits Values stored as Maps in Redis i.e. when value datatype in
+ * Redis is HashMap
+ *
+ * @displayName Redis Input Operator for Map
+ * @category Store
+ * @tags input operator, key value
+ *
+ */
+
+public class RedisMapAsValueInputOperator extends AbstractRedisInputOperator<KeyValPair<String, Map<String, String>>>
+{
+ @Override
+ public void processTuples()
+ {
+ for (String key : keys) {
+ if (store.getType(key).equals("hash")) {
+ Map<String, String> mapValue = store.getMap(key);
+ outputPort.emit(new KeyValPair<String, Map<String, String>>(key, mapValue));
+ }
+ }
+ keys.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java
new file mode 100644
index 0000000..5a73e61
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java
@@ -0,0 +1,204 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.redis;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Setter;
+import com.datatorrent.lib.util.PojoUtils.SetterBoolean;
+import com.datatorrent.lib.util.PojoUtils.SetterDouble;
+import com.datatorrent.lib.util.PojoUtils.SetterFloat;
+import com.datatorrent.lib.util.PojoUtils.SetterInt;
+import com.datatorrent.lib.util.PojoUtils.SetterLong;
+import com.datatorrent.lib.util.PojoUtils.SetterShort;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * This is a Redis input operator, which scans all keys in Redis store It
+ * converts Value stored as map to Plain Old Java Object. It outputs
+ * KeyValuePair with POJO as value
+ * <p>
+ * This output adapter Reads from RedisStore stored as <Key, Map> It outputs a
+ * Key value pair <key, POJO> as tuples.
+ * </p>
+ *
+ * @displayName Redis POJO Input Operator
+ * @category Store
+ * @tags output operator, key value
+ *
+ */
+@Evolving
+public class RedisPOJOInputOperator extends AbstractRedisInputOperator<KeyValPair<String, Object>>
+{
+ protected final Map<String, Object> map = new HashMap<String, Object>();
+ private ArrayList<FieldInfo> dataColumns;
+ private transient ArrayList<Object> setters;
+ private boolean isFirstTuple = true;
+ private String outputClass;
+ private Class<?> objectClass;
+
+ public RedisPOJOInputOperator()
+ {
+ super();
+ setters = new ArrayList<Object>();
+ }
+
+ @SuppressWarnings("unchecked")
+ private Object convertMapToObject(Map<String, String> tuple)
+ {
+ try {
+ Object mappedObject = objectClass.newInstance();
+ for (int i = 0; i < dataColumns.size(); i++) {
+ final SupportType type = dataColumns.get(i).getType();
+ final String columnName = dataColumns.get(i).getColumnName();
+
+ if (i < setters.size()) {
+ String value = tuple.get(columnName);
+ switch (type) {
+ case STRING:
+ ((Setter<Object, String>) setters.get(i)).set(mappedObject, value);
+ break;
+ case BOOLEAN:
+ ((SetterBoolean) setters.get(i)).set(mappedObject, Boolean.parseBoolean(value));
+ break;
+ case SHORT:
+ ((SetterShort) setters.get(i)).set(mappedObject, Short.parseShort(value));
+ break;
+ case INTEGER:
+ ((SetterInt) setters.get(i)).set(mappedObject, Integer.parseInt(value));
+ break;
+ case LONG:
+ ((SetterLong) setters.get(i)).set(mappedObject, Long.parseLong(value));
+ break;
+ case FLOAT:
+ ((SetterFloat) setters.get(i)).set(mappedObject, Float.parseFloat(value));
+ break;
+ case DOUBLE:
+ ((SetterDouble) setters.get(i)).set(mappedObject, Double.parseDouble(value));
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ return mappedObject;
+ } catch (Exception e) {
+ DTThrowable.wrapIfChecked(e);
+ }
+ return null;
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ }
+
+ public void processFirstTuple(Map<String, String> value) throws ClassNotFoundException
+ {
+ objectClass = Class.forName(getOutputClass());
+
+ final int size = dataColumns.size();
+ for (int i = 0; i < size; i++) {
+ final SupportType type = dataColumns.get(i).getType();
+ final String getterExpression = dataColumns.get(i).getPojoFieldExpression();
+ final Object setter;
+ switch (type) {
+ case STRING:
+ setter = PojoUtils.createSetter(objectClass, getterExpression, String.class);
+ break;
+ case BOOLEAN:
+ setter = PojoUtils.createSetterBoolean(objectClass, getterExpression);
+ break;
+ case SHORT:
+ setter = PojoUtils.createSetterShort(objectClass, getterExpression);
+ break;
+ case INTEGER:
+ setter = PojoUtils.createSetterInt(objectClass, getterExpression);
+ break;
+ case LONG:
+ setter = PojoUtils.createSetterLong(objectClass, getterExpression);
+ break;
+ case FLOAT:
+ setter = PojoUtils.createSetterFloat(objectClass, getterExpression);
+ break;
+ case DOUBLE:
+ setter = PojoUtils.createSetterDouble(objectClass, getterExpression);
+ break;
+ default:
+ setter = PojoUtils.createSetter(objectClass, getterExpression, Object.class);
+ break;
+ }
+ setters.add(setter);
+ }
+ }
+
+ @Override
+ public void processTuples()
+ {
+ for (String key : keys) {
+ if (store.getType(key).equals("hash")) {
+ Map<String, String> mapValue = store.getMap(key);
+ if (isFirstTuple) {
+ try {
+ processFirstTuple(mapValue);
+ } catch (ClassNotFoundException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+ isFirstTuple = false;
+ outputPort.emit(new KeyValPair<String, Object>(key, convertMapToObject(mapValue)));
+ }
+ }
+ keys.clear();
+ }
+
+ /*
+ * Output class type
+ */
+ public String getOutputClass()
+ {
+ return outputClass;
+ }
+
+ public void setOutputClass(String outputClass)
+ {
+ this.outputClass = outputClass;
+ }
+
+ /*
+ * An arraylist of data column names to be set in Redis store as a Map. Gets
+ * column names, column expressions and column data types
+ */
+ public ArrayList<FieldInfo> getDataColumns()
+ {
+ return dataColumns;
+ }
+
+ public void setDataColumns(ArrayList<FieldInfo> dataColumns)
+ {
+ this.dataColumns = dataColumns;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOOutputOperator.java
new file mode 100644
index 0000000..8966248
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOOutputOperator.java
@@ -0,0 +1,155 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.redis;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+
+/**
+ * This is a Redis output operator, which takes a Key and corresponding Plain
+ * Old Java Object as input. And writes a Map out to Redis based on Expressions
+ * provided.
+ * <p>
+ * This output adapter takes a Key value pair <key, POJO> as tuples and just
+ * writes to the redis store with the key and the value is a Map containing
+ * object attributes as <keys,value> Note: Redis output operator should never
+ * use the passthrough method because it begins a transaction at beginWindow and
+ * commits a transaction at endWindow, and a transaction in Redis blocks all
+ * other clients.
+ * </p>
+ *
+ * @displayName Redis POJO Output Operator
+ * @category Store
+ * @tags output operator, key value
+ *
+ */
+public class RedisPOJOOutputOperator extends AbstractRedisAggregateOutputOperator<KeyValPair<String, Object>>
+{
+ protected final Map<String, Object> map = new HashMap<String, Object>();
+ private ArrayList<FieldInfo> dataColumns;
+ private transient ArrayList<Object> getters;
+ private boolean isFirstTuple = true;
+
+ public RedisPOJOOutputOperator()
+ {
+ super();
+ getters = new ArrayList<Object>();
+ }
+
+ @Override
+ public void storeAggregate()
+ {
+ for (Entry<String, Object> entry : map.entrySet()) {
+
+ Map<String, String> mapObject = convertObjectToMap(entry.getValue());
+ store.put(entry.getKey(), mapObject);
+ }
+ }
+
+ private Map<String, String> convertObjectToMap(Object tuple)
+ {
+
+ Map<String, String> mappedObject = new HashMap<String, String>();
+ for (int i = 0; i < dataColumns.size(); i++) {
+ final SupportType type = dataColumns.get(i).getType();
+ final String columnName = dataColumns.get(i).getColumnName();
+
+ if (i < getters.size()) {
+ Getter<Object, Object> obj = (Getter<Object, Object>) (getters.get(i));
+
+ Object value = obj.get(tuple);
+ mappedObject.put(columnName, value.toString());
+ }
+ }
+
+ return mappedObject;
+ }
+
+ public void processFirstTuple(KeyValPair<String, Object> tuple)
+ {
+ // Create getters using first value entry in map
+ // Entry<String, Object> entry= tuple.entrySet().iterator().next();
+ Object value = tuple.getValue();
+
+ final Class<?> fqcn = value.getClass();
+ final int size = dataColumns.size();
+ for (int i = 0; i < size; i++) {
+ final SupportType type = dataColumns.get(i).getType();
+ final String getterExpression = dataColumns.get(i).getPojoFieldExpression();
+ final Object getter;
+ switch (type) {
+ case STRING:
+ getter = PojoUtils.createGetter(fqcn, getterExpression, String.class);
+ break;
+ case BOOLEAN:
+ getter = PojoUtils.createGetterBoolean(fqcn, getterExpression);
+ break;
+ case SHORT:
+ getter = PojoUtils.createGetterShort(fqcn, getterExpression);
+ break;
+ case INTEGER:
+ getter = PojoUtils.createGetter(fqcn, getterExpression, type.getJavaType());
+ break;
+ case LONG:
+ getter = PojoUtils.createGetterLong(fqcn, getterExpression);
+ break;
+ case FLOAT:
+ getter = PojoUtils.createGetterFloat(fqcn, getterExpression);
+ break;
+ case DOUBLE:
+ getter = PojoUtils.createGetterDouble(fqcn, getterExpression);
+ break;
+ default:
+ getter = PojoUtils.createGetter(fqcn, getterExpression, Object.class);
+ break;
+ }
+ getters.add(getter);
+ }
+ }
+
+ @Override
+ public void processTuple(KeyValPair<String, Object> tuple)
+ {
+ if (isFirstTuple) {
+ processFirstTuple(tuple);
+ }
+
+ isFirstTuple = false;
+ map.put(tuple.getKey(), tuple.getValue());
+ }
+
+ /*
+ * An arraylist of data column names to be set in Redis store as a Map. Gets
+ * column names, column expressions and column data types
+ */
+ public ArrayList<FieldInfo> getDataColumns()
+ {
+ return dataColumns;
+ }
+
+ public void setDataColumns(ArrayList<FieldInfo> dataColumns)
+ {
+ this.dataColumns = dataColumns;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java
index ea8e26b..2acc1d5 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java
@@ -23,6 +23,8 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
+import redis.clients.jedis.ScanParams;
+import redis.clients.jedis.ScanResult;
import redis.clients.jedis.Transaction;
import com.datatorrent.lib.db.TransactionableKeyValueStore;
@@ -181,6 +183,26 @@ public class RedisStore implements TransactionableKeyValueStore
return jedis.get(key.toString());
}
+ public String getType(String key)
+ {
+ return jedis.type(key);
+ }
+
+ /**
+ * Gets the stored Map for given the key, when the value data type is a map, stored with hmset
+ *
+ * @param key
+ * @return hashmap stored for the key.
+ */
+ public Map<String, String> getMap(Object key)
+ {
+ if (isInTransaction()) {
+ throw new RuntimeException("Cannot call get when in redis transaction");
+ }
+ return jedis.hgetAll(key.toString());
+ }
+
+
/**
* Gets all the values given the keys.
* Note that it does NOT work with hash values or list values
@@ -255,6 +277,11 @@ public class RedisStore implements TransactionableKeyValueStore
}
}
+ public ScanResult<String> ScanKeys(Integer offset, ScanParams params)
+ {
+ return jedis.scan(offset.toString(), params);
+ }
+
/**
* Calls hincrbyfloat on the redis store.
*
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
new file mode 100644
index 0000000..08fb294
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
@@ -0,0 +1,193 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.contrib.redis;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import redis.clients.jedis.ScanParams;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KeyValPair;
+
+public class RedisInputOperatorTest
+{
+ private RedisStore operatorStore;
+ private RedisStore testStore;
+
+ public static class CollectorModule extends BaseOperator
+ {
+ volatile static List<KeyValPair<String, String>> resultMap = new ArrayList<KeyValPair<String, String>>();
+ static long resultCount = 0;
+
+ public final transient DefaultInputPort<KeyValPair<String, String>> inputPort = new DefaultInputPort<KeyValPair<String, String>>()
+ {
+ @Override
+ public void process(KeyValPair<String, String> tuple)
+ {
+ resultMap.add(tuple);
+ resultCount++;
+ }
+ };
+ }
+
+ @Test
+ public void testIntputOperator() throws IOException
+ {
+ this.operatorStore = new RedisStore();
+ this.testStore = new RedisStore();
+
+ testStore.connect();
+ ScanParams params = new ScanParams();
+ params.count(1);
+
+ testStore.put("test_abc", "789");
+ testStore.put("test_def", "456");
+ testStore.put("test_ghi", "123");
+
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ DAG dag = lma.getDAG();
+
+ RedisKeyValueInputOperator inputOperator = dag.addOperator("input", new RedisKeyValueInputOperator());
+ final CollectorModule collector = dag.addOperator("collector", new CollectorModule());
+
+ inputOperator.setStore(operatorStore);
+ dag.addStream("stream", inputOperator.outputPort, collector.inputPort);
+ final LocalMode.Controller lc = lma.getController();
+
+ new Thread("LocalClusterController")
+ {
+ @Override
+ public void run()
+ {
+ long startTms = System.currentTimeMillis();
+ long timeout = 50000L;
+ try {
+ Thread.sleep(1000);
+ while (System.currentTimeMillis() - startTms < timeout) {
+ if (CollectorModule.resultMap.size() < 3) {
+ Thread.sleep(10);
+ } else {
+ break;
+ }
+ }
+ } catch (InterruptedException ex) {
+ }
+ lc.shutdown();
+ }
+ }.start();
+
+ lc.run();
+
+ Assert.assertTrue(CollectorModule.resultMap.contains(new KeyValPair<String, String>("test_abc", "789")));
+ Assert.assertTrue(CollectorModule.resultMap.contains(new KeyValPair<String, String>("test_def", "456")));
+ Assert.assertTrue(CollectorModule.resultMap.contains(new KeyValPair<String, String>("test_ghi", "123")));
+ } finally {
+ for (KeyValPair<String, String> entry : CollectorModule.resultMap) {
+ testStore.remove(entry.getKey());
+ }
+ testStore.disconnect();
+ }
+ }
+
+ @Test
+ public void testRecoveryAndIdempotency() throws Exception
+ {
+ this.operatorStore = new RedisStore();
+ this.testStore = new RedisStore();
+
+ testStore.connect();
+ ScanParams params = new ScanParams();
+ params.count(1);
+
+ testStore.put("test_abc", "789");
+ testStore.put("test_def", "456");
+ testStore.put("test_ghi", "123");
+
+ RedisKeyValueInputOperator operator = new RedisKeyValueInputOperator();
+ operator.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+
+ operator.setStore(operatorStore);
+ operator.setScanCount(1);
+ Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+ CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
+
+ operator.outputPort.setSink(sink);
+ OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap);
+
+ try {
+ operator.setup(context);
+ operator.beginWindow(1);
+ operator.emitTuples();
+ operator.endWindow();
+
+ int numberOfMessagesInWindow1 = sink.collectedTuples.size();
+ sink.collectedTuples.clear();
+
+ operator.beginWindow(2);
+ operator.emitTuples();
+ operator.endWindow();
+ int numberOfMessagesInWindow2 = sink.collectedTuples.size();
+ sink.collectedTuples.clear();
+
+ // failure and then re-deployment of operator
+ // Re-instantiating to reset values
+ operator = new RedisKeyValueInputOperator();
+ operator.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+ operator.setStore(operatorStore);
+ operator.setScanCount(1);
+ operator.outputPort.setSink(sink);
+ operator.setup(context);
+
+ Assert.assertEquals("largest recovery window", 2, operator.getIdempotentStorageManager().getLargestRecoveryWindow());
+
+ operator.beginWindow(1);
+ operator.emitTuples();
+ operator.emitTuples();
+ operator.endWindow();
+
+ Assert.assertEquals("num of messages in window 1", numberOfMessagesInWindow1, sink.collectedTuples.size());
+
+ sink.collectedTuples.clear();
+ operator.beginWindow(2);
+ operator.emitTuples();
+ operator.endWindow();
+ Assert.assertEquals("num of messages in window 2",numberOfMessagesInWindow2, sink.collectedTuples.size());
+ } finally {
+ for (Object e : sink.collectedTuples) {
+ KeyValPair<String, String> entry = (KeyValPair<String, String>) e;
+ testStore.remove(entry.getKey());
+ }
+ sink.collectedTuples.clear();
+ operator.getIdempotentStorageManager().deleteUpTo(context.getId(), 5);
+ operator.teardown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/test/java/com/datatorrent/contrib/redis/RedisPOJOOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisPOJOOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisPOJOOperatorTest.java
new file mode 100644
index 0000000..7792b5a
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisPOJOOperatorTest.java
@@ -0,0 +1,230 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.redis;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import redis.clients.jedis.ScanParams;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.contrib.redis.RedisInputOperatorTest.CollectorModule;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.datatorrent.lib.util.KeyValPair;
+
+public class RedisPOJOOperatorTest
+{
+ private RedisStore operatorStore;
+ private RedisStore testStore;
+
+ public static class TestClass
+ {
+ private Integer intValue;
+ private String stringValue;
+
+ public TestClass()
+ {
+ }
+
+ public TestClass(int v1, String v2)
+ {
+ intValue = v1;
+ stringValue = v2;
+ }
+
+ public Integer getIntValue()
+ {
+ return intValue;
+ }
+
+ public void setIntValue(int intValue)
+ {
+ this.intValue = intValue;
+ }
+
+ public String getStringValue()
+ {
+ return stringValue;
+ }
+
+ public void setStringValue(String stringValue)
+ {
+ this.stringValue = stringValue;
+ }
+ }
+
+ @Test
+ public void testOutputOperator() throws IOException
+ {
+ this.operatorStore = new RedisStore();
+
+ operatorStore.connect();
+ String appId = "test_appid";
+ int operatorId = 0;
+
+ operatorStore.removeCommittedWindowId(appId, operatorId);
+ operatorStore.disconnect();
+
+ RedisPOJOOutputOperator outputOperator = new RedisPOJOOutputOperator();
+
+ ArrayList<FieldInfo> fields = new ArrayList<FieldInfo>();
+
+ fields.add(new FieldInfo("column1", "intValue", SupportType.INTEGER));
+ fields.add(new FieldInfo("column2", "getStringValue()", SupportType.STRING));
+
+ outputOperator.setDataColumns(fields);
+
+ try {
+ com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributes = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.APPLICATION_ID, appId);
+
+ outputOperator.setStore(operatorStore);
+ outputOperator.setup(new OperatorContextTestHelper.TestIdOperatorContext(operatorId, attributes));
+ outputOperator.beginWindow(101);
+
+ KeyValPair<String, Object> keyVal = new KeyValPair<String, Object>("test_abc1", new TestClass(1, "abc"));
+
+ outputOperator.input.process(keyVal);
+
+ outputOperator.endWindow();
+
+ outputOperator.teardown();
+
+ operatorStore.connect();
+
+ Map<String, String> out = operatorStore.getMap("test_abc1");
+ Assert.assertEquals("1", out.get("column1"));
+ Assert.assertEquals("abc", out.get("column2"));
+ } finally {
+ operatorStore.remove("test_abc1");
+ operatorStore.disconnect();
+ }
+ }
+
+ public static class ObjectCollectorModule extends BaseOperator
+ {
+ volatile static Map<String, Object> resultMap = new HashMap<String, Object>();
+ static long resultCount = 0;
+
+ public final transient DefaultInputPort<KeyValPair<String, Object>> inputPort = new DefaultInputPort<KeyValPair<String, Object>>()
+ {
+ @Override
+ public void process(KeyValPair<String, Object> tuple)
+ {
+ resultMap.put(tuple.getKey(), tuple.getValue());
+ resultCount++;
+ }
+ };
+ }
+
+ @Test
+ public void testInputOperator() throws IOException
+ {
+ @SuppressWarnings("unused")
+ Class<?> clazz = org.codehaus.janino.CompilerFactory.class;
+
+ this.operatorStore = new RedisStore();
+ this.testStore = new RedisStore();
+
+ testStore.connect();
+ ScanParams params = new ScanParams();
+ params.count(100);
+
+ Map<String, String> value = new HashMap<String, String>();
+ value.put("Column1", "abc");
+ value.put("Column2", "1");
+
+ Map<String, String> value1 = new HashMap<String, String>();
+ value1.put("Column1", "def");
+ value1.put("Column2", "2");
+
+ Map<String, String> value2 = new HashMap<String, String>();
+ value2.put("Column1", "ghi");
+ value2.put("Column2", "3");
+
+ testStore.put("test_abc_in", value);
+ testStore.put("test_def_in", value1);
+ testStore.put("test_ghi_in", value2);
+
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ DAG dag = lma.getDAG();
+
+ RedisPOJOInputOperator inputOperator = dag.addOperator("input", new RedisPOJOInputOperator());
+ final ObjectCollectorModule collector = dag.addOperator("collector", new ObjectCollectorModule());
+
+ ArrayList<FieldInfo> fields = new ArrayList<FieldInfo>();
+
+ fields.add(new FieldInfo("Column1", "stringValue", SupportType.STRING));
+ fields.add(new FieldInfo("Column2", "intValue", SupportType.INTEGER));
+
+ inputOperator.setDataColumns(fields);
+ inputOperator.setOutputClass(TestClass.class.getName());
+
+ inputOperator.setStore(operatorStore);
+ dag.addStream("stream", inputOperator.outputPort, collector.inputPort);
+ final LocalMode.Controller lc = lma.getController();
+
+ new Thread("LocalClusterController")
+ {
+ @Override
+ public void run()
+ {
+ long startTms = System.currentTimeMillis();
+ long timeout = 10000L;
+ try {
+ Thread.sleep(1000);
+ while (System.currentTimeMillis() - startTms < timeout) {
+ if (ObjectCollectorModule.resultMap.size() < 3) {
+ Thread.sleep(10);
+ } else {
+ break;
+ }
+ }
+ } catch (InterruptedException ex) {
+ }
+ lc.shutdown();
+ }
+ }.start();
+
+ lc.run();
+
+ Assert.assertTrue(ObjectCollectorModule.resultMap.containsKey("test_abc_in"));
+ Assert.assertTrue(ObjectCollectorModule.resultMap.containsKey("test_def_in"));
+ Assert.assertTrue(ObjectCollectorModule.resultMap.containsKey("test_ghi_in"));
+
+ TestClass a = (TestClass) ObjectCollectorModule.resultMap.get("test_abc_in");
+ Assert.assertNotNull(a);
+ Assert.assertEquals("abc", a.stringValue);
+ Assert.assertEquals("1", a.intValue.toString());
+ } finally {
+ for (KeyValPair<String, String> entry : CollectorModule.resultMap) {
+ testStore.remove(entry.getKey());
+ }
+ testStore.disconnect();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/demos/machinedata/pom.xml
----------------------------------------------------------------------
diff --git a/demos/machinedata/pom.xml b/demos/machinedata/pom.xml
index 1f3f075..3498d0d 100644
--- a/demos/machinedata/pom.xml
+++ b/demos/machinedata/pom.xml
@@ -31,7 +31,7 @@
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
- <version>2.2.1</version>
+ <version>2.5.1</version>
</dependency>
<dependency>
<groupId>javax.mail</groupId>