You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/06/04 14:28:06 UTC

[1/2] flink git commit: [streaming] Minor cleanups on data stream.

Repository: flink
Updated Branches:
  refs/heads/master 86692e55f -> 8e49e7b5b


[streaming] Minor cleanups on data stream.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e49e7b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e49e7b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e49e7b5

Branch: refs/heads/master
Commit: 8e49e7b5b7b3f889b45c5bba3a4164540b2a507e
Parents: b5e5591
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jun 4 13:01:39 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jun 4 14:27:00 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    | 46 +++++++-------------
 .../flink/streaming/api/scala/DataStream.scala  | 23 +++++-----
 2 files changed, 28 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8e49e7b5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index f4b2088..296df13 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -131,8 +131,7 @@ public class DataStream<OUT> {
 	 * @param typeInfo
 	 *            Type of the datastream
 	 */
-	public DataStream(StreamExecutionEnvironment environment, String operatorType,
-			TypeInformation<OUT> typeInfo) {
+	public DataStream(StreamExecutionEnvironment environment, String operatorType, TypeInformation<OUT> typeInfo) {
 		if (environment == null) {
 			throw new NullPointerException("context is null");
 		}
@@ -1025,9 +1024,7 @@ public class DataStream<OUT> {
 	 */
 	public DataStreamSink<OUT> print() {
 		PrintSinkFunction<OUT> printFunction = new PrintSinkFunction<OUT>();
-		DataStreamSink<OUT> returnStream = addSink(printFunction);
-
-		return returnStream;
+		return addSink(printFunction);
 	}
 
 	/**
@@ -1039,9 +1036,7 @@ public class DataStream<OUT> {
 	 */
 	public DataStreamSink<OUT> printToErr() {
 		PrintSinkFunction<OUT> printFunction = new PrintSinkFunction<OUT>(true);
-		DataStreamSink<OUT> returnStream = addSink(printFunction);
-
-		return returnStream;
+		return addSink(printFunction);
 	}
 
 	/**
@@ -1055,7 +1050,7 @@ public class DataStream<OUT> {
 	 * @return the closed DataStream.
 	 */
 	public DataStreamSink<OUT> writeAsText(String path) {
-		return writeToFile(new TextOutputFormat<OUT>(new Path(path)), 0L);
+		return write(new TextOutputFormat<OUT>(new Path(path)), 0L);
 	}
 
 	/**
@@ -1073,7 +1068,7 @@ public class DataStream<OUT> {
 	 */
 	public DataStreamSink<OUT> writeAsText(String path, long millis) {
 		TextOutputFormat<OUT> tof = new TextOutputFormat<OUT>(new Path(path));
-		return writeToFile(tof, millis);
+		return write(tof, millis);
 	}
 
 	/**
@@ -1092,7 +1087,7 @@ public class DataStream<OUT> {
 	public DataStreamSink<OUT> writeAsText(String path, WriteMode writeMode) {
 		TextOutputFormat<OUT> tof = new TextOutputFormat<OUT>(new Path(path));
 		tof.setWriteMode(writeMode);
-		return writeToFile(tof, 0L);
+		return write(tof, 0L);
 	}
 
 	/**
@@ -1113,7 +1108,7 @@ public class DataStream<OUT> {
 	public DataStreamSink<OUT> writeAsText(String path, WriteMode writeMode, long millis) {
 		TextOutputFormat<OUT> tof = new TextOutputFormat<OUT>(new Path(path));
 		tof.setWriteMode(writeMode);
-		return writeToFile(tof, millis);
+		return write(tof, millis);
 	}
 
 	/**
@@ -1132,7 +1127,7 @@ public class DataStream<OUT> {
 				"The writeAsCsv() method can only be used on data sets of tuples.");
 		CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
 				CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
-		return writeToFile((FileOutputFormat<OUT>) of, 0L);
+		return write((OutputFormat<OUT>) of, 0L);
 	}
 
 	/**
@@ -1154,7 +1149,7 @@ public class DataStream<OUT> {
 				"The writeAsCsv() method can only be used on data sets of tuples.");
 		CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
 				CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
-		return writeToFile((FileOutputFormat<OUT>) of, millis);
+		return write((OutputFormat<OUT>) of, millis);
 	}
 
 	/**
@@ -1179,7 +1174,7 @@ public class DataStream<OUT> {
 		if (writeMode != null) {
 			of.setWriteMode(writeMode);
 		}
-		return writeToFile((FileOutputFormat<OUT>) of, 0L);
+		return write((OutputFormat<OUT>) of, 0L);
 	}
 
 	/**
@@ -1208,7 +1203,7 @@ public class DataStream<OUT> {
 		if (writeMode != null) {
 			of.setWriteMode(writeMode);
 		}
-		return writeToFile((FileOutputFormat<OUT>) of, millis);
+		return write((OutputFormat<OUT>) of, millis);
 	}
 
 	/**
@@ -1223,35 +1218,26 @@ public class DataStream<OUT> {
 	 *            schema for serialization
 	 * @return the closed DataStream
 	 */
-	public DataStreamSink<OUT> writeToSocket(String hostName, int port,
-			SerializationSchema<OUT, byte[]> schema) {
+	public DataStreamSink<OUT> writeToSocket(String hostName, int port, SerializationSchema<OUT, byte[]> schema) {
 		DataStreamSink<OUT> returnStream = addSink(new SocketClientSink<OUT>(hostName, port, schema));
 		returnStream.setParallelism(1); // It would not work if multiple instances would connect to the same port
 		return returnStream;
 	}
-
-	public DataStreamSink<OUT> writeToFile(FileOutputFormat<OUT> format, long millis) {
-		return addSink(new FileSinkFunctionByMillis<OUT>(format, millis));
-	}
 	
 	/**
-	 * Writes the dataStream into an output.
+	 * Writes the dataStream into an output, described by an OutputFormat.
+	 * 
 	 * @param format The output format
 	 * @param millis the write frequency
-	 * @return the closed DataStream
+	 * @return The closed DataStream
 	 */
 	public DataStreamSink<OUT> write(OutputFormat<OUT> format, long millis) {
 		return addSink(new FileSinkFunctionByMillis<OUT>(format, millis));
 	}
 
 	protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
-
 		StreamReduce<OUT> operator = new StreamReduce<OUT>(aggregate);
-
-		SingleOutputStreamOperator<OUT, ?> returnStream = transform("Aggregation", getType(),
-				operator);
-
-		return returnStream;
+		return transform("Aggregation", getType(), operator);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/8e49e7b5/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 3b2183b..b29c6d3 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -50,7 +50,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
   def getJavaStream: JavaStream[T] = javaStream
 
   /**
-   * Returns the ID of the {@link DataStream}.
+   * Returns the ID of the DataStream.
    *
    * @return ID of the DataStream
    */
@@ -59,7 +59,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
   /**
    * Returns the TypeInformation for the elements of this DataStream.
    */
-  def getType(): TypeInformation[T] = javaStream.getType
+  def getType(): TypeInformation[T] = javaStream.getType()
 
   /**
    * Sets the parallelism of this operation. This must be at least 1.
@@ -108,7 +108,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
   /**
    * Turns off chaining for this operator so thread co-location will not be
    * used as an optimization. </p> Chaining can be turned off for the whole
-   * job by {@link StreamExecutionEnvironment#disableOperatorChaining()}
+   * job by [[StreamExecutionEnvironment.disableOperatorChaining()]]
    * however it is not advised for performance considerations.
    * 
    */
@@ -704,16 +704,17 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * written.
    *
    */
-  def print(): DataStream[T] = javaStream.print
+  def print(): DataStream[T] = javaStream.print()
 
   /**
-   * Writes a DataStream to the standard output stream (stderr).<br>
+   * Writes a DataStream to the standard output stream (stderr).
+   * 
    * For each element of the DataStream the result of
-   * {@link Object#toString()} is written.
+   * [[AnyRef.toString()]] is written.
    *
    * @return The closed DataStream.
    */
-  def printToErr() = javaStream.printToErr
+  def printToErr() = javaStream.printToErr()
 
   /**
    * Writes a DataStream to the file specified by path in text format. The
@@ -743,20 +744,20 @@ class DataStream[T](javaStream: JavaStream[T]) {
     if (writeMode != null) {
       of.setWriteMode(writeMode)
     }
-    javaStream.writeToFile(of.asInstanceOf[OutputFormat[T]], millis)
+    javaStream.write(of.asInstanceOf[OutputFormat[T]], millis)
   }
 
   /**
    * Writes a DataStream using the given [[OutputFormat]]. The
    * writing is performed periodically, in every millis milliseconds.
    */
-  def writeToFile(format: OutputFormat[T], millis: Long): DataStreamSink[T] = {
-    javaStream.writeToFile(format, millis)
+  def write(format: OutputFormat[T], millis: Long): DataStreamSink[T] = {
+    javaStream.write(format, millis)
   }
 
   /**
    * Writes the DataStream to a socket as a byte array. The format of the output is
-   * specified by a {@link SerializationSchema}.
+   * specified by a [[SerializationSchema]].
    */
   def writeToSocket(hostname: String, port: Integer, schema: SerializationSchema[T, Array[Byte]]):
     DataStream[T] = javaStream.writeToSocket(hostname, port, schema)


[2/2] flink git commit: [hbase] Add a stream data sink for HBase

Posted by se...@apache.org.
[hbase] Add a stream data sink for HBase

This closes #706


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5e55918
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5e55918
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5e55918

Branch: refs/heads/master
Commit: b5e55918c0e6f51df00d2d39548d12f1b7710aa5
Parents: 86692e5
Author: yildirim <yi...@gronau.neofonie.priv>
Authored: Thu May 21 12:28:15 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jun 4 14:27:00 2015 +0200

----------------------------------------------------------------------
 flink-staging/flink-hbase/pom.xml               |   6 +
 .../hbase/example/HBaseWriteStreamExample.java  | 114 +++++++++++++++++++
 .../streaming/api/datastream/DataStream.java    |  26 ++---
 3 files changed, 133 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b5e55918/flink-staging/flink-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hbase/pom.xml b/flink-staging/flink-hbase/pom.xml
index 805ff4b..1a5a1db 100644
--- a/flink-staging/flink-hbase/pom.xml
+++ b/flink-staging/flink-hbase/pom.xml
@@ -59,6 +59,12 @@ under the License.
 		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-clients</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e55918/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
new file mode 100644
index 0000000..1e113c0
--- /dev/null
+++ b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.example;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * 
+ * This is an example how to write streams into HBase. In this example the
+ * stream will be written into a local Hbase but it is possible to adapt this
+ * example for an HBase running in a cloud. You need a running local HBase with a
+ * table "flinkExample" and a column "entry". If your HBase configuration does
+ * not fit the hbase-site.xml in the resource folder then you gave to delete temporary this
+ * hbase-site.xml to execute the example properly.
+ * 
+ */
+public class HBaseWriteStreamExample {
+
+	public static void main(String[] args) {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.getExecutionEnvironment();
+
+		// data stream with random numbers
+		DataStream<String> dataStream = env.addSource(new SourceFunction<String>() {
+
+			@Override
+			public boolean reachedEnd() throws Exception {
+				return false;
+			}
+
+			@Override
+			public String next() throws Exception {
+				return 	String.valueOf(Math.floor(Math.random() * 100));
+			}
+
+		});
+		dataStream.write(new HBaseOutputFormat(), 0L);
+
+		try {
+			env.execute();
+		} catch (Exception e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+	}
+
+	/**
+	 * 
+	 * This class implements an OutputFormat for HBase
+	 *
+	 */
+	private static class HBaseOutputFormat implements OutputFormat<String> {
+
+		private org.apache.hadoop.conf.Configuration conf = null;
+		private HTable table = null;
+		private String taskNumber = null;
+		private int rowNumber = 0;
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void configure(Configuration parameters) {
+			conf = HBaseConfiguration.create();
+		}
+
+		@Override
+		public void open(int taskNumber, int numTasks) throws IOException {
+			table = new HTable(conf, "flinkExample");
+			this.taskNumber = String.valueOf(taskNumber);
+		}
+
+		@Override
+		public void writeRecord(String record) throws IOException {
+			Put put = new Put(Bytes.toBytes(taskNumber + rowNumber));
+			put.add(Bytes.toBytes("entry"), Bytes.toBytes("entry"),
+					Bytes.toBytes(rowNumber));
+			rowNumber++;
+			table.put(put);
+		}
+
+		@Override
+		public void close() throws IOException {
+			table.flushCommits();
+			table.close();
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e55918/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index e0a8a7a..f4b2088 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -1132,7 +1132,7 @@ public class DataStream<OUT> {
 				"The writeAsCsv() method can only be used on data sets of tuples.");
 		CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
 				CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
-		return writeToFile((OutputFormat<OUT>) of, 0L);
+		return writeToFile((FileOutputFormat<OUT>) of, 0L);
 	}
 
 	/**
@@ -1154,7 +1154,7 @@ public class DataStream<OUT> {
 				"The writeAsCsv() method can only be used on data sets of tuples.");
 		CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
 				CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
-		return writeToFile((OutputFormat<OUT>) of, millis);
+		return writeToFile((FileOutputFormat<OUT>) of, millis);
 	}
 
 	/**
@@ -1179,7 +1179,7 @@ public class DataStream<OUT> {
 		if (writeMode != null) {
 			of.setWriteMode(writeMode);
 		}
-		return writeToFile((OutputFormat<OUT>) of, 0L);
+		return writeToFile((FileOutputFormat<OUT>) of, 0L);
 	}
 
 	/**
@@ -1208,7 +1208,7 @@ public class DataStream<OUT> {
 		if (writeMode != null) {
 			of.setWriteMode(writeMode);
 		}
-		return writeToFile((OutputFormat<OUT>) of, millis);
+		return writeToFile((FileOutputFormat<OUT>) of, millis);
 	}
 
 	/**
@@ -1230,18 +1230,18 @@ public class DataStream<OUT> {
 		return returnStream;
 	}
 
+	public DataStreamSink<OUT> writeToFile(FileOutputFormat<OUT> format, long millis) {
+		return addSink(new FileSinkFunctionByMillis<OUT>(format, millis));
+	}
+	
 	/**
-	 * Writes a DataStream using the given {@link OutputFormat}. The
-	 * writing is performed periodically, in every millis milliseconds.
-	 *
-	 * @param format The output format that should be used for writing.
-	 * @param millis the file update frequency
-	 *
+	 * Writes the dataStream into an output.
+	 * @param format The output format
+	 * @param millis the write frequency
 	 * @return the closed DataStream
 	 */
-	public DataStreamSink<OUT> writeToFile(OutputFormat<OUT> format, long millis) {
-		DataStreamSink<OUT> returnStream = addSink(new FileSinkFunctionByMillis<OUT>(format, millis));
-		return returnStream;
+	public DataStreamSink<OUT> write(OutputFormat<OUT> format, long millis) {
+		return addSink(new FileSinkFunctionByMillis<OUT>(format, millis));
 	}
 
 	protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {