You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/29 21:03:47 UTC
[14/28] git commit: [streaming] Updated DBStates
[streaming] Updated DBStates
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ce55dc34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ce55dc34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ce55dc34
Branch: refs/heads/master
Commit: ce55dc34480dc4fe2d84f24a3433bba8ed4921fe
Parents: 6cf15c2
Author: ghermann <re...@gmail.com>
Authored: Tue Aug 19 18:04:19 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 29 21:01:56 2014 +0200
----------------------------------------------------------------------
.../db/CustomSerializationDBState.java | 31 +++++
.../streaming/connectors/db/DBSerializer.java | 36 ++++++
.../flink/streaming/connectors/db/DBState.java | 10 +-
.../connectors/db/DBStateIterator.java | 11 +-
.../connectors/db/DBStateWithIterator.java | 25 ++++
.../connectors/db/DefaultDBSerializer.java | 35 ++++++
.../streaming/connectors/db/LevelDBState.java | 125 +++++++++++++++++++
.../streaming/connectors/db/LeveldbState.java | 68 ----------
.../connectors/db/LeveldbStateIterator.java | 47 -------
.../streaming/connectors/db/MemcachedState.java | 32 +++--
.../streaming/connectors/db/RedisState.java | 96 +++++++++++---
.../connectors/db/RedisStateIterator.java | 59 ---------
.../streaming/connectors/db/DBStateTest.java | 105 ++++++++++++++++
.../streaming/connectors/db/LeveldbTest.java | 43 -------
.../streaming/connectors/db/MemcachedTest.java | 38 ------
.../streaming/connectors/db/RedisTest.java | 43 -------
16 files changed, 467 insertions(+), 337 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/CustomSerializationDBState.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/CustomSerializationDBState.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/CustomSerializationDBState.java
new file mode 100644
index 0000000..c5876a5
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/CustomSerializationDBState.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.db;
+
+import java.io.Serializable;
+
+public abstract class CustomSerializationDBState<K extends Serializable, V extends Serializable> {
+
+ protected DBSerializer<K> keySerializer;
+ protected DBSerializer<V> valueSerializer;
+
+ public CustomSerializationDBState(DBSerializer<K> keySerializer, DBSerializer<V> valueSerializer) {
+ this.keySerializer = keySerializer;
+ this.valueSerializer = valueSerializer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBSerializer.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBSerializer.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBSerializer.java
new file mode 100644
index 0000000..d94c39b
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBSerializer.java
@@ -0,0 +1,36 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.db;
+
+import java.io.Serializable;
+
+/**
+ * Interface for custom serialization of keys and values used in external
+ * databases.
+ *
+ * @param <T>
+ * Type of the key or value to serialize
+ */
+public interface DBSerializer<T extends Serializable> {
+
+ byte[] write(T object);
+
+ T read(byte[] serializedObject);
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBState.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBState.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBState.java
index 3dcac19..7194656 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBState.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBState.java
@@ -17,13 +17,13 @@
package org.apache.flink.streaming.connectors.db;
-public interface DBState {
+public interface DBState<K, V> {
- //TODO: consider more general parameters
- public void put(String key, String value);
+ public void put(K key, V value);
- public String get(String key);
+ public V get(K key);
- public void remove(String key);
+ public void remove(K key);
+ public void close();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateIterator.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateIterator.java
index d586ee7..6168e14 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateIterator.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateIterator.java
@@ -17,14 +17,15 @@
package org.apache.flink.streaming.connectors.db;
-public interface DBStateIterator {
+import java.io.Serializable;
- public boolean hasNext();
+public abstract class DBStateIterator<K extends Serializable, V extends Serializable> {
- public String getNextKey();
+ public abstract boolean hasNext();
- public String getNextValue();
+ public abstract K getNextKey();
- public void next();
+ public abstract V getNextValue();
+ public abstract void next();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateWithIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateWithIterator.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateWithIterator.java
new file mode 100644
index 0000000..c0f18fb
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateWithIterator.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.db;
+
+import java.io.Serializable;
+
+public interface DBStateWithIterator<K extends Serializable, V extends Serializable> extends DBState<K, V> {
+
+ public DBStateIterator<K, V> getIterator();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DefaultDBSerializer.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DefaultDBSerializer.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DefaultDBSerializer.java
new file mode 100644
index 0000000..b019456
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DefaultDBSerializer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.db;
+
+import java.io.Serializable;
+
+import org.apache.commons.lang3.SerializationUtils;
+
+public class DefaultDBSerializer<T extends Serializable> implements DBSerializer<T> {
+
+ @Override
+ public byte[] write(T object) {
+ return SerializationUtils.serialize(object);
+ }
+
+ @Override
+ public T read(byte[] serializedObject) {
+ return SerializationUtils.deserialize(serializedObject);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LevelDBState.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LevelDBState.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LevelDBState.java
new file mode 100644
index 0000000..edb2d20
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LevelDBState.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.db;
+
+import static org.fusesource.leveldbjni.JniDBFactory.factory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBIterator;
+import org.iq80.leveldb.Options;
+
+/**
+ * Interface to a LevelDB key-value store.
+ *
+ * See {@linktourl https://code.google.com/p/leveldb/}
+ *
+ * @param <K>
+ * Type of key
+ * @param <V>
+ * Type of value
+ */
+public class LevelDBState<K extends Serializable, V extends Serializable> extends
+ CustomSerializationDBState<K, V> implements DBStateWithIterator<K, V> {
+
+ private DB database;
+
+ public LevelDBState(String dbName, DBSerializer<K> keySerializer,
+ DBSerializer<V> valueSerializer) {
+ super(keySerializer, valueSerializer);
+ Options options = new Options();
+ File file = new File(dbName);
+ options.createIfMissing(true);
+ try {
+ factory.destroy(file, options);
+ database = factory.open(file, options);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public LevelDBState(String dbName) {
+ this(dbName, new DefaultDBSerializer<K>(), new DefaultDBSerializer<V>());
+ }
+
+ @Override
+ public void close() {
+ try {
+ database.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void put(K key, V value) {
+ database.put(keySerializer.write(key), valueSerializer.write(value));
+ }
+
+ @Override
+ public V get(K key) {
+ byte[] serializedValue = database.get(keySerializer.write(key));
+ if (serializedValue != null) {
+ return valueSerializer.read(serializedValue);
+ } else {
+ throw new RuntimeException("No such entry at key " + key);
+ }
+ }
+
+ @Override
+ public void remove(K key) {
+ database.delete(keySerializer.write(key));
+ }
+
+ @Override
+ public DBStateIterator<K, V> getIterator() {
+ return new LevelDBStateIterator();
+ }
+
+ private class LevelDBStateIterator extends DBStateIterator<K, V> {
+ private DBIterator iterator;
+
+ public LevelDBStateIterator() {
+ this.iterator = database.iterator();
+ this.iterator.seekToFirst();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public K getNextKey() {
+ return keySerializer.read(iterator.peekNext().getKey());
+ }
+
+ @Override
+ public V getNextValue() {
+ return valueSerializer.read(iterator.peekNext().getValue());
+ }
+
+ @Override
+ public void next() {
+ iterator.next();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LeveldbState.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LeveldbState.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LeveldbState.java
deleted file mode 100644
index 0c476e2..0000000
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LeveldbState.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.db;
-
-import static org.fusesource.leveldbjni.JniDBFactory.asString;
-import static org.fusesource.leveldbjni.JniDBFactory.bytes;
-import static org.fusesource.leveldbjni.JniDBFactory.factory;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.iq80.leveldb.DB;
-import org.iq80.leveldb.Options;
-
-public class LeveldbState implements DBState{
-
- private DB database;
-
- public LeveldbState(String dbName) {
- Options options = new Options();
- options.createIfMissing(true);
- try {
- database = factory.open(new File(dbName), options);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public void close() {
- try {
- database.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public void put(String key, String value) {
- database.put(bytes(key), bytes(value));
- }
-
- public String get(String key) {
- return asString(database.get(bytes(key)));
- }
-
- public void remove(String key) {
- database.delete(bytes(key));
- }
-
- public LeveldbStateIterator getIterator() {
- return new LeveldbStateIterator(database.iterator());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LeveldbStateIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LeveldbStateIterator.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LeveldbStateIterator.java
deleted file mode 100644
index 7d32826..0000000
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LeveldbStateIterator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.db;
-
-import static org.fusesource.leveldbjni.JniDBFactory.asString;
-
-import org.iq80.leveldb.DBIterator;
-
-public class LeveldbStateIterator implements DBStateIterator {
- private DBIterator iterator;
-
- public LeveldbStateIterator(DBIterator iter) {
- this.iterator = iter;
- this.iterator.seekToFirst();
- }
-
- public boolean hasNext() {
- return iterator.hasNext();
- }
-
- public String getNextKey() {
- return asString(iterator.peekNext().getKey());
- }
-
- public String getNextValue() {
- return asString(iterator.peekNext().getValue());
- }
-
- public void next() {
- iterator.next();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/MemcachedState.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/MemcachedState.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/MemcachedState.java
index ef6392c..1998d9e 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/MemcachedState.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/MemcachedState.java
@@ -22,15 +22,23 @@ import java.net.InetSocketAddress;
import net.spy.memcached.MemcachedClient;
-//Needs running Memcached service
-public class MemcachedState implements DBState {
+/**
+ * Interface to a Memcached key-value cache. It needs a running instance of Memcached.
+ *
+ * See {@linktourl http://memcached.org/}
+ *
+ * @param <K>
+ * Type of key
+ * @param <V>
+ * Type of value
+ */
+public class MemcachedState<V> implements DBState<String, V> {
private MemcachedClient memcached;
public MemcachedState() {
try {
- memcached = new MemcachedClient(new InetSocketAddress("localhost",
- 11211));
+ memcached = new MemcachedClient(new InetSocketAddress("localhost", 11211));
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -38,26 +46,30 @@ public class MemcachedState implements DBState {
public MemcachedState(String hostname, int portNum) {
try {
- memcached = new MemcachedClient(new InetSocketAddress(hostname,
- portNum));
+ memcached = new MemcachedClient(new InetSocketAddress(hostname, portNum));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
+ @Override
public void close() {
memcached.shutdown();
}
- public void put(String key, String value) {
+ @Override
+ public void put(String key, V value) {
memcached.set(key, 0, value);
}
- public String get(String key) {
- return (String) memcached.get(key);
+ @Override
+ @SuppressWarnings("unchecked")
+ public V get(String key) {
+ return (V) memcached.get(key);
}
+ @Override
public void remove(String key) {
- memcached.delete(key);
+ memcached.delete(key.toString());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/RedisState.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/RedisState.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/RedisState.java
index 93ca7e2..6f77226 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/RedisState.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/RedisState.java
@@ -17,35 +17,93 @@
package org.apache.flink.streaming.connectors.db;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Set;
+
import redis.clients.jedis.Jedis;
-//Needs running Redis service
-public class RedisState implements DBState {
-
+/**
+ * Interface to a Redis key-value cache. It needs a running instance of Redis.
+ *
+ * See {@linktourl http://redis.io/}
+ *
+ * @param <K>
+ * Type of key
+ * @param <V>
+ * Type of value
+ */
+public class RedisState<K extends Serializable, V extends Serializable> extends
+ CustomSerializationDBState<K, V> implements DBStateWithIterator<K, V> {
+
private Jedis jedis;
-
- public RedisState(){
+
+ public RedisState(DBSerializer<K> keySerializer, DBSerializer<V> valueSerializer) {
+ super(keySerializer, valueSerializer);
jedis = new Jedis("localhost");
}
-
- public void close(){
+
+ public RedisState() {
+ this(new DefaultDBSerializer<K>(), new DefaultDBSerializer<V>());
+ }
+
+ @Override
+ public void close() {
jedis.close();
}
-
- public void put(String key, String value){
- jedis.set(key, value);
+
+ @Override
+ public void put(K key, V value) {
+ jedis.set(keySerializer.write(key), valueSerializer.write(value));
}
-
- public String get(String key){
- return jedis.get(key);
+
+ @Override
+ public V get(K key) {
+ return valueSerializer.read(jedis.get(keySerializer.write(key)));
}
-
- public void remove(String key){
- jedis.del(key);
+
+ @Override
+ public void remove(K key) {
+ jedis.del(keySerializer.write(key));
}
-
- public RedisStateIterator getIterator(){
- return new RedisStateIterator(jedis);
+
+ @Override
+ public DBStateIterator<K, V> getIterator() {
+ return new RedisStateIterator();
}
+ private class RedisStateIterator extends DBStateIterator<K, V> {
+
+ private Set<byte[]> set;
+ private Iterator<byte[]> iterator;
+ private byte[] currentKey;
+
+ public RedisStateIterator() {
+ set = jedis.keys(new byte[0]);
+ jedis.keys("*".getBytes()).iterator();
+ iterator = set.iterator();
+ currentKey = iterator.next();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public K getNextKey() {
+ return keySerializer.read(currentKey);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public V getNextValue() {
+ return (V) jedis.get(currentKey);
+ }
+
+ @Override
+ public void next() {
+ currentKey = iterator.next();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/RedisStateIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/RedisStateIterator.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/RedisStateIterator.java
deleted file mode 100644
index 121012e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/RedisStateIterator.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.db;
-
-import java.util.Iterator;
-
-import redis.clients.jedis.Jedis;
-
-//Needs running Redis service
-public class RedisStateIterator implements DBStateIterator {
-
- private Iterator<String> iterator;
- private int position;
- private int size;
- private String currentKey;
- private Jedis jedis;
-
- public RedisStateIterator(Jedis jedis) {
- this.jedis = jedis;
- iterator = jedis.keys("*").iterator();
- size = jedis.keys("*").size();
- currentKey = iterator.next();
- position = 0;
- }
-
- public boolean hasNext() {
- return position != size;
- }
-
- public String getNextKey() {
- return currentKey;
- }
-
- public String getNextValue() {
- return jedis.get(currentKey);
- }
-
- public void next() {
- position += 1;
- if (position != size) {
- currentKey = iterator.next();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/DBStateTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/DBStateTest.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/DBStateTest.java
new file mode 100644
index 0000000..15f5a60
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/DBStateTest.java
@@ -0,0 +1,105 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.db;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class DBStateTest {
+
+ public void stateTest(DBState<String, Integer> state) {
+ state.put("k1", 1);
+ assertEquals(new Integer(1), state.get("k1"));
+
+ state.put("k2", 2);
+ state.put("k3", 3);
+ assertEquals(new Integer(2), state.get("k2"));
+ assertEquals(new Integer(3), state.get("k3"));
+ state.remove("k2");
+
+ try {
+ state.get("k2");
+ fail();
+ } catch (Exception e) {
+ }
+ }
+
+ private void iteratorTest(DBStateWithIterator<String, Integer> state) {
+ HashMap<String, Integer> expected = new HashMap<String, Integer>();
+ HashMap<String, Integer> result = new HashMap<String, Integer>();
+
+ state.put("10", 10);
+ state.put("20", 20);
+ state.put("30", 30);
+
+ expected.put("10", 10);
+ expected.put("20", 20);
+ expected.put("30", 30);
+
+ DBStateIterator<String, Integer> iterator = state.getIterator();
+ while (iterator.hasNext()) {
+ String key = iterator.getNextKey();
+ Integer value = iterator.getNextValue();
+ result.put(key, value);
+ iterator.next();
+ }
+ state.close();
+
+ assertEquals(expected, result);
+ }
+
+ // TODO
+ @Ignore("Creates files with no licenses")
+ @Test
+ public void levelDBTest() {
+ LevelDBState<String, Integer> state = new LevelDBState<String, Integer>("test");
+ stateTest(state);
+ state.close();
+
+ state = new LevelDBState<String, Integer>("test");
+ iteratorTest(state);
+ state.close();
+ }
+
+ @Ignore("Needs running Memcached")
+ @Test
+ public void memcachedTest() {
+ MemcachedState<Integer> state = new MemcachedState<Integer>();
+ stateTest(state);
+ state.close();
+ }
+
+ @Ignore("Needs running Redis")
+ @Test
+ public void redisTest() {
+ RedisState<String, Integer> state = new RedisState<String, Integer>();
+ stateTest(state);
+ state.close();
+
+ state = new RedisState<String, Integer>();
+ iteratorTest(state);
+ state.close();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/LeveldbTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/LeveldbTest.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/LeveldbTest.java
deleted file mode 100644
index a422940..0000000
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/LeveldbTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.connectors.db;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-public class LeveldbTest {
- @Ignore("Not yet ready")
- @Test
- public void databaseTest() {
- LeveldbState state = new LeveldbState("test");
- state.put("hello", "world");
- System.out.println(state.get("hello"));
- state.put("big", "data");
- state.put("flink", "streaming");
- LeveldbStateIterator iterator = state.getIterator();
- while (iterator.hasNext()) {
- String key = iterator.getNextKey();
- String value = iterator.getNextValue();
- System.out.println("key=" + key + ", value=" + value);
- iterator.next();
- }
- state.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/MemcachedTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/MemcachedTest.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/MemcachedTest.java
deleted file mode 100644
index 37fed86..0000000
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/MemcachedTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.connectors.db;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-public class MemcachedTest {
- @Ignore("Needs running Memcached service, not yet ready")
- @Test
- public void databaseState() {
- MemcachedState state = new MemcachedState();
- state.put("hello", "world");
- state.put("big", "data");
- state.put("flink", "streaming");
- System.out.println(state.get("hello"));
- System.out.println(state.get("big"));
- System.out.println(state.get("flink"));
- state.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce55dc34/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/RedisTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/RedisTest.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/RedisTest.java
deleted file mode 100644
index 5adf93f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/RedisTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.connectors.db;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-public class RedisTest {
- @Ignore("Needs running Redis service, not yet ready")
- @Test
- public void databaseTest() {
- RedisState state = new RedisState();
- state.put("hello", "world");
- System.out.println(state.get("hello"));
- state.put("big", "data");
- state.put("flink", "streaming");
- RedisStateIterator iterator = state.getIterator();
- while (iterator.hasNext()) {
- String key = iterator.getNextKey();
- String value = iterator.getNextValue();
- System.out.println("key=" + key + ", value=" + value);
- iterator.next();
- }
- state.close();
- }
-}