You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/29 21:03:35 UTC

[02/28] git commit: [streaming] Added database-backed state

[streaming] Added database-backed state


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/75b9bc85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/75b9bc85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/75b9bc85

Branch: refs/heads/master
Commit: 75b9bc85c08d2bcaa0f806ae126562bded2e71a4
Parents: 7596012
Author: Yingjun Wu <wu...@gmail.com>
Authored: Fri Aug 8 03:15:49 2014 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 29 21:01:56 2014 +0200

----------------------------------------------------------------------
 .../flink-streaming-core/pom.xml                | 14 ++++
 .../streaming/state/database/LeveldbState.java  | 69 ++++++++++++++++++++
 .../state/database/LeveldbStateIterator.java    | 48 ++++++++++++++
 .../streaming/state/database/RedisState.java    | 53 +++++++++++++++
 .../state/database/RedisStateIterator.java      | 59 +++++++++++++++++
 .../streaming/state/database/DatabaseTest.java  | 59 +++++++++++++++++
 6 files changed, 302 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/75b9bc85/flink-addons/flink-streaming/flink-streaming-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/pom.xml b/flink-addons/flink-streaming/flink-streaming-core/pom.xml
index 2c4abc6..0b74e27 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-core/pom.xml
@@ -42,6 +42,20 @@ under the License.
 			<version>2.0.6</version>
 		</dependency>
 
+		<dependency>
+			<groupId>org.fusesource.leveldbjni</groupId>
+			<artifactId>leveldbjni-all</artifactId>
+			<version>1.8</version>
+		</dependency>
+
+		<dependency>
+			<groupId>redis.clients</groupId>
+			<artifactId>jedis</artifactId>
+			<version>2.4.2</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/75b9bc85/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/LeveldbState.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/LeveldbState.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/LeveldbState.java
new file mode 100644
index 0000000..3a66ac9
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/LeveldbState.java
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.state.database;
+
+import static org.fusesource.leveldbjni.JniDBFactory.asString;
+import static org.fusesource.leveldbjni.JniDBFactory.bytes;
+import static org.fusesource.leveldbjni.JniDBFactory.factory;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.Options;
+
+public class LeveldbState {
+	
+	private DB database;
+	public LeveldbState(String dbName){
+		Options options = new Options();
+		options.createIfMissing(true);
+		try {
+			database = factory.open(new File(dbName), options);
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+	}
+	
+	public void close(){
+		try {
+			database.close();
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+	}
+	
+	public void setTuple(String key, String value){
+		database.put(bytes(key), bytes(value));
+	}
+	
+	public String getTuple(String key){
+		return asString(database.get(bytes(key)));
+	}
+	
+	public void deleteTuple(String key){
+		database.delete(bytes(key));
+	}
+	
+	public LeveldbStateIterator getIterator(){
+		return new LeveldbStateIterator(database.iterator());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/75b9bc85/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/LeveldbStateIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/LeveldbStateIterator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/LeveldbStateIterator.java
new file mode 100644
index 0000000..66c24d0
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/LeveldbStateIterator.java
@@ -0,0 +1,48 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.state.database;
+
+import static org.fusesource.leveldbjni.JniDBFactory.asString;
+
+import org.iq80.leveldb.DBIterator;
+
+public class LeveldbStateIterator {
+	private DBIterator iterator;
+	public LeveldbStateIterator(DBIterator iter){
+		this.iterator=iter;
+		this.iterator.seekToFirst();
+	}
+	
+	public boolean hasNext(){
+		return iterator.hasNext();
+	}
+	
+	public String getNextKey(){
+		return asString(iterator.peekNext().getKey());
+	}
+	
+	public String getNextValue(){
+		return asString(iterator.peekNext().getValue());
+	}
+	
+	public void next(){
+		iterator.next();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/75b9bc85/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/RedisState.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/RedisState.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/RedisState.java
new file mode 100644
index 0000000..61c48b7
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/RedisState.java
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.state.database;
+
+import redis.clients.jedis.Jedis;
+
+//this is the redis-supported state. To use this state, the users are required to boot their redis server first.
+public class RedisState {
+	
+	private Jedis jedis;
+	
+	public RedisState(){
+		jedis = new Jedis("localhost");
+	}
+	
+	public void close(){
+		jedis.close();
+	}
+	
+	public void setTuple(String key, String value){
+		jedis.set(key, value);
+	}
+	
+	public String getTuple(String key){
+		return jedis.get(key);
+	}
+	
+	public void deleteTuple(String key){
+		jedis.del(key);
+	}
+	
+	public RedisStateIterator getIterator(){
+		return new RedisStateIterator(jedis);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/75b9bc85/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/RedisStateIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/RedisStateIterator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/RedisStateIterator.java
new file mode 100644
index 0000000..7d8c4fb
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/database/RedisStateIterator.java
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.state.database;
+
+import java.util.Iterator;
+
+import redis.clients.jedis.Jedis;
+
+public class RedisStateIterator {
+	
+	private Iterator<String> iterator;
+	private int position;
+	private int size;
+	private String currentKey;
+	private Jedis jedis;
+	public RedisStateIterator(Jedis jedis){
+		this.jedis = jedis;
+		iterator = jedis.keys("*").iterator();
+		size = jedis.keys("*").size();
+		currentKey = iterator.next();
+		position = 0;
+	}
+	
+	public boolean hasNext(){
+		return position != size;
+	}
+	
+	public String getNextKey(){
+		return currentKey;
+	}
+	
+	public String getNextValue(){
+		return jedis.get(currentKey);
+	}
+	
+	public void next(){
+		position += 1;
+		if (position != size){
+			currentKey = iterator.next();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/75b9bc85/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/database/DatabaseTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/database/DatabaseTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/database/DatabaseTest.java
new file mode 100644
index 0000000..00ae0b1
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/database/DatabaseTest.java
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.state.database;
+
+import org.junit.Test;
+
+public class DatabaseTest {
+	
+	@Test
+	public void LeveldbTest(){
+		LeveldbState state=new LeveldbState("test");
+		state.setTuple("hello", "world");
+		System.out.println(state.getTuple("hello"));
+		state.setTuple("big", "data");
+		state.setTuple("flink", "streaming");
+		LeveldbStateIterator iterator=state.getIterator();
+		while(iterator.hasNext()){
+			String key=iterator.getNextKey();
+			String value=iterator.getNextValue();
+			System.out.println("key="+key+", value="+value);
+			iterator.next();
+		}
+		state.close();
+	}
+	
+	@Test
+	public void RedisTest(){
+		RedisState state=new RedisState();
+		state.setTuple("hello", "world");
+		System.out.println(state.getTuple("hello"));
+		state.setTuple("big", "data");
+		state.setTuple("flink", "streaming");
+		RedisStateIterator iterator=state.getIterator();
+		while(iterator.hasNext()){
+			String key=iterator.getNextKey();
+			String value=iterator.getNextValue();
+			System.out.println("key="+key+", value="+value);
+			iterator.next();
+		}
+		state.close();
+	}
+}