You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/29 21:03:35 UTC
[02/28] git commit: [streaming] Added database-backed state
[streaming] Added database-backed state
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/75b9bc85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/75b9bc85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/75b9bc85
Branch: refs/heads/master
Commit: 75b9bc85c08d2bcaa0f806ae126562bded2e71a4
Parents: 7596012
Author: Yingjun Wu <wu...@gmail.com>
Authored: Fri Aug 8 03:15:49 2014 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 29 21:01:56 2014 +0200
----------------------------------------------------------------------
.../flink-streaming-core/pom.xml | 14 ++++
.../streaming/state/database/LeveldbState.java | 69 ++++++++++++++++++++
.../state/database/LeveldbStateIterator.java | 48 ++++++++++++++
.../streaming/state/database/RedisState.java | 53 +++++++++++++++
.../state/database/RedisStateIterator.java | 59 +++++++++++++++++
.../streaming/state/database/DatabaseTest.java | 59 +++++++++++++++++
6 files changed, 302 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/75b9bc85/flink-addons/flink-streaming/flink-streaming-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/pom.xml b/flink-addons/flink-streaming/flink-streaming-core/pom.xml
index 2c4abc6..0b74e27 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-core/pom.xml
@@ -42,6 +42,20 @@ under the License.
<version>2.0.6</version>
</dependency>
+ <dependency>
+ <groupId>org.fusesource.leveldbjni</groupId>
+ <artifactId>leveldbjni-all</artifactId>
+ <version>1.8</version>
+ </dependency>
+
+ <dependency>
+ <groupId>redis.clients</groupId>
+ <artifactId>jedis</artifactId>
+ <version>2.4.2</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/75b9bc85/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/LeveldbState.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/LeveldbState.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/LeveldbState.java
new file mode 100644
index 0000000..3a66ac9
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/LeveldbState.java
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.state.database;
+
+import static org.fusesource.leveldbjni.JniDBFactory.asString;
+import static org.fusesource.leveldbjni.JniDBFactory.bytes;
+import static org.fusesource.leveldbjni.JniDBFactory.factory;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.Options;
+
+public class LeveldbState {
+
+ private DB database;
+ public LeveldbState(String dbName){
+ Options options = new Options();
+ options.createIfMissing(true);
+ try {
+ database = factory.open(new File(dbName), options);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void close(){
+ try {
+ database.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void setTuple(String key, String value){
+ database.put(bytes(key), bytes(value));
+ }
+
+ public String getTuple(String key){
+ return asString(database.get(bytes(key)));
+ }
+
+ public void deleteTuple(String key){
+ database.delete(bytes(key));
+ }
+
+ public LeveldbStateIterator getIterator(){
+ return new LeveldbStateIterator(database.iterator());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/75b9bc85/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/LeveldbStateIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/LeveldbStateIterator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/LeveldbStateIterator.java
new file mode 100644
index 0000000..66c24d0
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/LeveldbStateIterator.java
@@ -0,0 +1,48 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.state.database;
+
+import static org.fusesource.leveldbjni.JniDBFactory.asString;
+
+import org.iq80.leveldb.DBIterator;
+
+public class LeveldbStateIterator {
+ private DBIterator iterator;
+ public LeveldbStateIterator(DBIterator iter){
+ this.iterator=iter;
+ this.iterator.seekToFirst();
+ }
+
+ public boolean hasNext(){
+ return iterator.hasNext();
+ }
+
+ public String getNextKey(){
+ return asString(iterator.peekNext().getKey());
+ }
+
+ public String getNextValue(){
+ return asString(iterator.peekNext().getValue());
+ }
+
+ public void next(){
+ iterator.next();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/75b9bc85/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/RedisState.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/RedisState.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/RedisState.java
new file mode 100644
index 0000000..61c48b7
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/RedisState.java
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.state.database;
+
+import redis.clients.jedis.Jedis;
+
+//this is the redis-supported state. To use this state, the users are required to boot their redis server first.
+public class RedisState {
+
+ private Jedis jedis;
+
+ public RedisState(){
+ jedis = new Jedis("localhost");
+ }
+
+ public void close(){
+ jedis.close();
+ }
+
+ public void setTuple(String key, String value){
+ jedis.set(key, value);
+ }
+
+ public String getTuple(String key){
+ return jedis.get(key);
+ }
+
+ public void deleteTuple(String key){
+ jedis.del(key);
+ }
+
+ public RedisStateIterator getIterator(){
+ return new RedisStateIterator(jedis);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/75b9bc85/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/RedisStateIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/RedisStateIterator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/RedisStateIterator.java
new file mode 100644
index 0000000..7d8c4fb
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/RedisStateIterator.java
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.state.database;
+
+import java.util.Iterator;
+
+import redis.clients.jedis.Jedis;
+
+public class RedisStateIterator {
+
+ private Iterator<String> iterator;
+ private int position;
+ private int size;
+ private String currentKey;
+ private Jedis jedis;
+ public RedisStateIterator(Jedis jedis){
+ this.jedis = jedis;
+ iterator = jedis.keys("*").iterator();
+ size = jedis.keys("*").size();
+ currentKey = iterator.next();
+ position = 0;
+ }
+
+ public boolean hasNext(){
+ return position != size;
+ }
+
+ public String getNextKey(){
+ return currentKey;
+ }
+
+ public String getNextValue(){
+ return jedis.get(currentKey);
+ }
+
+ public void next(){
+ position += 1;
+ if (position != size){
+ currentKey = iterator.next();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/75b9bc85/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/database/DatabaseTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/database/DatabaseTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/database/DatabaseTest.java
new file mode 100644
index 0000000..00ae0b1
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/database/DatabaseTest.java
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.state.database;
+
+import org.junit.Test;
+
+public class DatabaseTest {
+
+ @Test
+ public void LeveldbTest(){
+ LeveldbState state=new LeveldbState("test");
+ state.setTuple("hello", "world");
+ System.out.println(state.getTuple("hello"));
+ state.setTuple("big", "data");
+ state.setTuple("flink", "streaming");
+ LeveldbStateIterator iterator=state.getIterator();
+ while(iterator.hasNext()){
+ String key=iterator.getNextKey();
+ String value=iterator.getNextValue();
+ System.out.println("key="+key+", value="+value);
+ iterator.next();
+ }
+ state.close();
+ }
+
+ @Test
+ public void RedisTest(){
+ RedisState state=new RedisState();
+ state.setTuple("hello", "world");
+ System.out.println(state.getTuple("hello"));
+ state.setTuple("big", "data");
+ state.setTuple("flink", "streaming");
+ RedisStateIterator iterator=state.getIterator();
+ while(iterator.hasNext()){
+ String key=iterator.getNextKey();
+ String value=iterator.getNextValue();
+ System.out.println("key="+key+", value="+value);
+ iterator.next();
+ }
+ state.close();
+ }
+}