You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/29 21:03:47 UTC

[14/28] git commit: [streaming] Updated DBStates

[streaming] Updated DBStates


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ce55dc34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ce55dc34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ce55dc34

Branch: refs/heads/master
Commit: ce55dc34480dc4fe2d84f24a3433bba8ed4921fe
Parents: 6cf15c2
Author: ghermann <re...@gmail.com>
Authored: Tue Aug 19 18:04:19 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 29 21:01:56 2014 +0200

----------------------------------------------------------------------
 .../db/CustomSerializationDBState.java          |  31 +++++
 .../streaming/connectors/db/DBSerializer.java   |  36 ++++++
 .../flink/streaming/connectors/db/DBState.java  |  10 +-
 .../connectors/db/DBStateIterator.java          |  11 +-
 .../connectors/db/DBStateWithIterator.java      |  25 ++++
 .../connectors/db/DefaultDBSerializer.java      |  35 ++++++
 .../streaming/connectors/db/LevelDBState.java   | 125 +++++++++++++++++++
 .../streaming/connectors/db/LeveldbState.java   |  68 ----------
 .../connectors/db/LeveldbStateIterator.java     |  47 -------
 .../streaming/connectors/db/MemcachedState.java |  32 +++--
 .../streaming/connectors/db/RedisState.java     |  96 +++++++++++---
 .../connectors/db/RedisStateIterator.java       |  59 ---------
 .../streaming/connectors/db/DBStateTest.java    | 105 ++++++++++++++++
 .../streaming/connectors/db/LeveldbTest.java    |  43 -------
 .../streaming/connectors/db/MemcachedTest.java  |  38 ------
 .../streaming/connectors/db/RedisTest.java      |  43 -------
 16 files changed, 467 insertions(+), 337 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/CustomSerializationDBState.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/CustomSerializationDBState.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/CustomSerializationDBState.java
new file mode 100644
index 0000000..c5876a5
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/CustomSerializationDBState.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.db;
+
+import java.io.Serializable;
+
+public abstract class CustomSerializationDBState<K extends Serializable, V extends Serializable> {
+
+	protected DBSerializer<K> keySerializer;
+	protected DBSerializer<V> valueSerializer;
+
+	public CustomSerializationDBState(DBSerializer<K> keySerializer, DBSerializer<V> valueSerializer) {
+		this.keySerializer = keySerializer;
+		this.valueSerializer = valueSerializer;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBSerializer.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBSerializer.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBSerializer.java
new file mode 100644
index 0000000..d94c39b
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBSerializer.java
@@ -0,0 +1,36 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.db;
+
+import java.io.Serializable;
+
+/**
+ * Interface for custom serialization of keys and values used in external
+ * databases.
+ *
+ * @param <T>
+ *            Type of the key or value to serialize
+ */
+public interface DBSerializer<T extends Serializable> {
+	
+	byte[] write(T object);
+
+	T read(byte[] serializedObject);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBState.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBState.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBState.java
index 3dcac19..7194656 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBState.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBState.java
@@ -17,13 +17,13 @@
 
 package org.apache.flink.streaming.connectors.db;
 
-public interface DBState {
+public interface DBState<K, V> {
 	
-	//TODO: consider more general parameters
-	public void put(String key, String value);
+	public void put(K key, V value);
 	
-	public String get(String key);
+	public V get(K key);
 	
-	public void remove(String key);
+	public void remove(K key);
 
+	public void close();
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateIterator.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateIterator.java
index d586ee7..6168e14 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateIterator.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateIterator.java
@@ -17,14 +17,15 @@
 
 package org.apache.flink.streaming.connectors.db;
 
-public interface DBStateIterator {
+import java.io.Serializable;
 
-	public boolean hasNext();
+public abstract class DBStateIterator<K extends Serializable, V extends Serializable> {
 
-	public String getNextKey();
+	public abstract boolean hasNext();
 
-	public String getNextValue();
+	public abstract K getNextKey();
 
-	public void next();
+	public abstract V getNextValue();
 
+	public abstract void next();
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateWithIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateWithIterator.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateWithIterator.java
new file mode 100644
index 0000000..c0f18fb
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateWithIterator.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.db;
+
+import java.io.Serializable;
+
+public interface DBStateWithIterator<K extends Serializable, V extends Serializable> extends DBState<K, V> {
+	
+	public DBStateIterator<K, V> getIterator();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DefaultDBSerializer.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DefaultDBSerializer.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DefaultDBSerializer.java
new file mode 100644
index 0000000..b019456
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DefaultDBSerializer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.db;
+
+import java.io.Serializable;
+
+import org.apache.commons.lang3.SerializationUtils;
+
+public class DefaultDBSerializer<T extends Serializable>  implements DBSerializer<T> {
+	
+	@Override
+	public byte[] write(T object) {
+		return SerializationUtils.serialize(object);
+	}
+
+	@Override
+	public T read(byte[] serializedObject) {
+		return SerializationUtils.deserialize(serializedObject);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LevelDBState.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LevelDBState.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LevelDBState.java
new file mode 100644
index 0000000..edb2d20
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LevelDBState.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.db;
+
+import static org.fusesource.leveldbjni.JniDBFactory.factory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBIterator;
+import org.iq80.leveldb.Options;
+
+/**
+ * Interface to a LevelDB key-value store.
+ * 
+ * See {@linktourl https://code.google.com/p/leveldb/}
+ * 
+ * @param <K>
+ *            Type of key
+ * @param <V>
+ *            Type of value
+ */
+public class LevelDBState<K extends Serializable, V extends Serializable> extends
+		CustomSerializationDBState<K, V> implements DBStateWithIterator<K, V> {
+
+	private DB database;
+
+	public LevelDBState(String dbName, DBSerializer<K> keySerializer,
+			DBSerializer<V> valueSerializer) {
+		super(keySerializer, valueSerializer);
+		Options options = new Options();
+		File file = new File(dbName);
+		options.createIfMissing(true);
+		try {
+			factory.destroy(file, options);
+			database = factory.open(file, options);
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	public LevelDBState(String dbName) {
+		this(dbName, new DefaultDBSerializer<K>(), new DefaultDBSerializer<V>());
+	}
+	
+	@Override
+	public void close() {
+		try {
+			database.close();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override
+	public void put(K key, V value) {
+		database.put(keySerializer.write(key), valueSerializer.write(value));
+	}
+
+	@Override
+	public V get(K key) {
+		byte[] serializedValue = database.get(keySerializer.write(key));
+		if (serializedValue != null) {
+			return valueSerializer.read(serializedValue);
+		} else {
+			throw new RuntimeException("No such entry at key " + key);
+		}
+	}
+
+	@Override
+	public void remove(K key) {
+		database.delete(keySerializer.write(key));
+	}
+
+	@Override
+	public DBStateIterator<K, V> getIterator() {
+		return new LevelDBStateIterator();
+	}
+
+	private class LevelDBStateIterator extends DBStateIterator<K, V> {
+		private DBIterator iterator;
+
+		public LevelDBStateIterator() {
+			this.iterator = database.iterator();
+			this.iterator.seekToFirst();
+		}
+
+		@Override
+		public boolean hasNext() {
+			return iterator.hasNext();
+		}
+
+		@Override
+		public K getNextKey() {
+			return keySerializer.read(iterator.peekNext().getKey());
+		}
+
+		@Override
+		public V getNextValue() {
+			return valueSerializer.read(iterator.peekNext().getValue());
+		}
+
+		@Override
+		public void next() {
+			iterator.next();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LeveldbState.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LeveldbState.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LeveldbState.java
deleted file mode 100644
index 0c476e2..0000000
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LeveldbState.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.db;
-
-import static org.fusesource.leveldbjni.JniDBFactory.asString;
-import static org.fusesource.leveldbjni.JniDBFactory.bytes;
-import static org.fusesource.leveldbjni.JniDBFactory.factory;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.iq80.leveldb.DB;
-import org.iq80.leveldb.Options;
-
-public class LeveldbState implements DBState{
-
-	private DB database;
-
-	public LeveldbState(String dbName) {
-		Options options = new Options();
-		options.createIfMissing(true);
-		try {
-			database = factory.open(new File(dbName), options);
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	public void close() {
-		try {
-			database.close();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	public void put(String key, String value) {
-		database.put(bytes(key), bytes(value));
-	}
-
-	public String get(String key) {
-		return asString(database.get(bytes(key)));
-	}
-
-	public void remove(String key) {
-		database.delete(bytes(key));
-	}
-
-	public LeveldbStateIterator getIterator() {
-		return new LeveldbStateIterator(database.iterator());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LeveldbStateIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LeveldbStateIterator.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LeveldbStateIterator.java
deleted file mode 100644
index 7d32826..0000000
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LeveldbStateIterator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.db;
-
-import static org.fusesource.leveldbjni.JniDBFactory.asString;
-
-import org.iq80.leveldb.DBIterator;
-
-public class LeveldbStateIterator implements DBStateIterator {
-	private DBIterator iterator;
-
-	public LeveldbStateIterator(DBIterator iter) {
-		this.iterator = iter;
-		this.iterator.seekToFirst();
-	}
-
-	public boolean hasNext() {
-		return iterator.hasNext();
-	}
-
-	public String getNextKey() {
-		return asString(iterator.peekNext().getKey());
-	}
-
-	public String getNextValue() {
-		return asString(iterator.peekNext().getValue());
-	}
-
-	public void next() {
-		iterator.next();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/MemcachedState.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/MemcachedState.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/MemcachedState.java
index ef6392c..1998d9e 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/MemcachedState.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/MemcachedState.java
@@ -22,15 +22,23 @@ import java.net.InetSocketAddress;
 
 import net.spy.memcached.MemcachedClient;
 
-//Needs running Memcached service
-public class MemcachedState implements DBState {
+/**
+ * Interface to a Memcached key-value cache. It needs a running instance of Memcached.
+ * 
+ * See {@linktourl http://memcached.org/}
+ * 
+ * @param <K>
+ *            Type of key
+ * @param <V>
+ *            Type of value
+ */
+public class MemcachedState<V> implements DBState<String, V> {
 
 	private MemcachedClient memcached;
 
 	public MemcachedState() {
 		try {
-			memcached = new MemcachedClient(new InetSocketAddress("localhost",
-					11211));
+			memcached = new MemcachedClient(new InetSocketAddress("localhost", 11211));
 		} catch (IOException e) {
 			throw new RuntimeException(e);
 		}
@@ -38,26 +46,30 @@ public class MemcachedState implements DBState {
 
 	public MemcachedState(String hostname, int portNum) {
 		try {
-			memcached = new MemcachedClient(new InetSocketAddress(hostname,
-					portNum));
+			memcached = new MemcachedClient(new InetSocketAddress(hostname, portNum));
 		} catch (IOException e) {
 			throw new RuntimeException(e);
 		}
 	}
 
+	@Override
 	public void close() {
 		memcached.shutdown();
 	}
 
-	public void put(String key, String value) {
+	@Override
+	public void put(String key, V value) {
 		memcached.set(key, 0, value);
 	}
 
-	public String get(String key) {
-		return (String) memcached.get(key);
+	@Override
+	@SuppressWarnings("unchecked")
+	public V get(String key) {
+		return (V) memcached.get(key);
 	}
 
+	@Override
 	public void remove(String key) {
-		memcached.delete(key);
+		memcached.delete(key.toString());
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/RedisState.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/RedisState.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/RedisState.java
index 93ca7e2..6f77226 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/RedisState.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/RedisState.java
@@ -17,35 +17,93 @@
 
 package org.apache.flink.streaming.connectors.db;
 
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Set;
+
 import redis.clients.jedis.Jedis;
 
-//Needs running Redis service
-public class RedisState implements DBState {
-	
+/**
+ * Interface to a Redis key-value cache. It needs a running instance of Redis.
+ * 
+ * See {@linktourl http://redis.io/}
+ * 
+ * @param <K>
+ *            Type of key
+ * @param <V>
+ *            Type of value
+ */
+public class RedisState<K extends Serializable, V extends Serializable> extends
+		CustomSerializationDBState<K, V> implements DBStateWithIterator<K, V> {
+
 	private Jedis jedis;
-	
-	public RedisState(){
+
+	public RedisState(DBSerializer<K> keySerializer, DBSerializer<V> valueSerializer) {
+		super(keySerializer, valueSerializer);
 		jedis = new Jedis("localhost");
 	}
-	
-	public void close(){
+
+	public RedisState() {
+		this(new DefaultDBSerializer<K>(), new DefaultDBSerializer<V>());
+	}
+
+	@Override
+	public void close() {
 		jedis.close();
 	}
-	
-	public void put(String key, String value){
-		jedis.set(key, value);
+
+	@Override
+	public void put(K key, V value) {
+		jedis.set(keySerializer.write(key), valueSerializer.write(value));
 	}
-	
-	public String get(String key){
-		return jedis.get(key);
+
+	@Override
+	public V get(K key) {
+		return valueSerializer.read(jedis.get(keySerializer.write(key)));
 	}
-	
-	public void remove(String key){
-		jedis.del(key);
+
+	@Override
+	public void remove(K key) {
+		jedis.del(keySerializer.write(key));
 	}
-	
-	public RedisStateIterator getIterator(){
-		return new RedisStateIterator(jedis);
+
+	@Override
+	public DBStateIterator<K, V> getIterator() {
+		return new RedisStateIterator();
 	}
 
+	private class RedisStateIterator extends DBStateIterator<K, V> {
+
+		private Set<byte[]> set;
+		private Iterator<byte[]> iterator;
+		private byte[] currentKey;
+
+		public RedisStateIterator() {
+			set = jedis.keys(new byte[0]);
+			jedis.keys("*".getBytes()).iterator();
+			iterator = set.iterator();
+			currentKey = iterator.next();
+		}
+
+		@Override
+		public boolean hasNext() {
+			return iterator.hasNext();
+		}
+
+		@Override
+		public K getNextKey() {
+			return keySerializer.read(currentKey);
+		}
+
+		@Override
+		@SuppressWarnings("unchecked")
+		public V getNextValue() {
+			return (V) jedis.get(currentKey);
+		}
+
+		@Override
+		public void next() {
+			currentKey = iterator.next();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/RedisStateIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/RedisStateIterator.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/RedisStateIterator.java
deleted file mode 100644
index 121012e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/RedisStateIterator.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.db;
-
-import java.util.Iterator;
-
-import redis.clients.jedis.Jedis;
-
-//Needs running Redis service
-public class RedisStateIterator implements DBStateIterator {
-
-	private Iterator<String> iterator;
-	private int position;
-	private int size;
-	private String currentKey;
-	private Jedis jedis;
-
-	public RedisStateIterator(Jedis jedis) {
-		this.jedis = jedis;
-		iterator = jedis.keys("*").iterator();
-		size = jedis.keys("*").size();
-		currentKey = iterator.next();
-		position = 0;
-	}
-
-	public boolean hasNext() {
-		return position != size;
-	}
-
-	public String getNextKey() {
-		return currentKey;
-	}
-
-	public String getNextValue() {
-		return jedis.get(currentKey);
-	}
-
-	public void next() {
-		position += 1;
-		if (position != size) {
-			currentKey = iterator.next();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/DBStateTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/DBStateTest.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/DBStateTest.java
new file mode 100644
index 0000000..15f5a60
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/DBStateTest.java
@@ -0,0 +1,105 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.db;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class DBStateTest {
+	
+	public void stateTest(DBState<String, Integer> state) {
+		state.put("k1", 1);
+		assertEquals(new Integer(1), state.get("k1"));
+
+		state.put("k2", 2);
+		state.put("k3", 3);
+		assertEquals(new Integer(2), state.get("k2"));
+		assertEquals(new Integer(3), state.get("k3"));
+		state.remove("k2");
+		
+		try {
+			state.get("k2");
+			fail();
+		} catch (Exception e) {
+		}
+	}
+	
+	private void iteratorTest(DBStateWithIterator<String, Integer> state) {
+		HashMap<String, Integer> expected = new HashMap<String, Integer>();
+		HashMap<String, Integer> result = new HashMap<String, Integer>();
+		
+		state.put("10", 10);
+		state.put("20", 20);
+		state.put("30", 30);
+
+		expected.put("10", 10);
+		expected.put("20", 20);
+		expected.put("30", 30);
+		
+		DBStateIterator<String, Integer> iterator = state.getIterator();
+		while (iterator.hasNext()) {
+			String key = iterator.getNextKey();
+			Integer value = iterator.getNextValue();
+			result.put(key, value);
+			iterator.next();
+		}
+		state.close();
+		
+		assertEquals(expected, result);
+	}
+	
+	// TODO
+	@Ignore("Creates files with no licenses")
+	@Test
+	public void levelDBTest() {
+		LevelDBState<String, Integer> state = new LevelDBState<String, Integer>("test");
+		stateTest(state);
+		state.close();
+		
+		state = new LevelDBState<String, Integer>("test");
+		iteratorTest(state);
+		state.close();
+	}
+	
+	@Ignore("Needs running Memcached")
+	@Test
+	public void memcachedTest() {
+		MemcachedState<Integer> state = new MemcachedState<Integer>();
+		stateTest(state);
+		state.close();
+	}
+
+	@Ignore("Needs running Redis")
+	@Test
+	public void redisTest() {
+		RedisState<String, Integer> state = new RedisState<String, Integer>();
+		stateTest(state);
+		state.close();
+		
+		state = new RedisState<String, Integer>();
+		iteratorTest(state);
+		state.close();
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/LeveldbTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/LeveldbTest.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/LeveldbTest.java
deleted file mode 100644
index a422940..0000000
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/LeveldbTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.connectors.db;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-public class LeveldbTest {
-	@Ignore("Not yet ready")
-	@Test
-	public void databaseTest() {
-		LeveldbState state = new LeveldbState("test");
-		state.put("hello", "world");
-		System.out.println(state.get("hello"));
-		state.put("big", "data");
-		state.put("flink", "streaming");
-		LeveldbStateIterator iterator = state.getIterator();
-		while (iterator.hasNext()) {
-			String key = iterator.getNextKey();
-			String value = iterator.getNextValue();
-			System.out.println("key=" + key + ", value=" + value);
-			iterator.next();
-		}
-		state.close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/MemcachedTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/MemcachedTest.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/MemcachedTest.java
deleted file mode 100644
index 37fed86..0000000
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/MemcachedTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.connectors.db;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-public class MemcachedTest {
-	@Ignore("Needs running Memcached service, not yet ready")
-	@Test
-	public void databaseState() {
-		MemcachedState state = new MemcachedState();
-		state.put("hello", "world");
-		state.put("big", "data");
-		state.put("flink", "streaming");
-		System.out.println(state.get("hello"));
-		System.out.println(state.get("big"));
-		System.out.println(state.get("flink"));
-		state.close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/RedisTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/RedisTest.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/RedisTest.java
deleted file mode 100644
index 5adf93f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/RedisTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.connectors.db;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-public class RedisTest {
-	@Ignore("Needs running Redis service, not yet ready")
-	@Test
-	public void databaseTest() {
-		RedisState state = new RedisState();
-		state.put("hello", "world");
-		System.out.println(state.get("hello"));
-		state.put("big", "data");
-		state.put("flink", "streaming");
-		RedisStateIterator iterator = state.getIterator();
-		while (iterator.hasNext()) {
-			String key = iterator.getNextKey();
-			String value = iterator.getNextValue();
-			System.out.println("key=" + key + ", value=" + value);
-			iterator.next();
-		}
-		state.close();
-	}
-}