You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/06/04 14:28:06 UTC
[1/2] flink git commit: [streaming] Minor cleanups on data stream.
Repository: flink
Updated Branches:
refs/heads/master 86692e55f -> 8e49e7b5b
[streaming] Minor cleanups on data stream.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e49e7b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e49e7b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e49e7b5
Branch: refs/heads/master
Commit: 8e49e7b5b7b3f889b45c5bba3a4164540b2a507e
Parents: b5e5591
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jun 4 13:01:39 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jun 4 14:27:00 2015 +0200
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 46 +++++++-------------
.../flink/streaming/api/scala/DataStream.scala | 23 +++++-----
2 files changed, 28 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8e49e7b5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index f4b2088..296df13 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -131,8 +131,7 @@ public class DataStream<OUT> {
* @param typeInfo
* Type of the datastream
*/
- public DataStream(StreamExecutionEnvironment environment, String operatorType,
- TypeInformation<OUT> typeInfo) {
+ public DataStream(StreamExecutionEnvironment environment, String operatorType, TypeInformation<OUT> typeInfo) {
if (environment == null) {
throw new NullPointerException("context is null");
}
@@ -1025,9 +1024,7 @@ public class DataStream<OUT> {
*/
public DataStreamSink<OUT> print() {
PrintSinkFunction<OUT> printFunction = new PrintSinkFunction<OUT>();
- DataStreamSink<OUT> returnStream = addSink(printFunction);
-
- return returnStream;
+ return addSink(printFunction);
}
/**
@@ -1039,9 +1036,7 @@ public class DataStream<OUT> {
*/
public DataStreamSink<OUT> printToErr() {
PrintSinkFunction<OUT> printFunction = new PrintSinkFunction<OUT>(true);
- DataStreamSink<OUT> returnStream = addSink(printFunction);
-
- return returnStream;
+ return addSink(printFunction);
}
/**
@@ -1055,7 +1050,7 @@ public class DataStream<OUT> {
* @return the closed DataStream.
*/
public DataStreamSink<OUT> writeAsText(String path) {
- return writeToFile(new TextOutputFormat<OUT>(new Path(path)), 0L);
+ return write(new TextOutputFormat<OUT>(new Path(path)), 0L);
}
/**
@@ -1073,7 +1068,7 @@ public class DataStream<OUT> {
*/
public DataStreamSink<OUT> writeAsText(String path, long millis) {
TextOutputFormat<OUT> tof = new TextOutputFormat<OUT>(new Path(path));
- return writeToFile(tof, millis);
+ return write(tof, millis);
}
/**
@@ -1092,7 +1087,7 @@ public class DataStream<OUT> {
public DataStreamSink<OUT> writeAsText(String path, WriteMode writeMode) {
TextOutputFormat<OUT> tof = new TextOutputFormat<OUT>(new Path(path));
tof.setWriteMode(writeMode);
- return writeToFile(tof, 0L);
+ return write(tof, 0L);
}
/**
@@ -1113,7 +1108,7 @@ public class DataStream<OUT> {
public DataStreamSink<OUT> writeAsText(String path, WriteMode writeMode, long millis) {
TextOutputFormat<OUT> tof = new TextOutputFormat<OUT>(new Path(path));
tof.setWriteMode(writeMode);
- return writeToFile(tof, millis);
+ return write(tof, millis);
}
/**
@@ -1132,7 +1127,7 @@ public class DataStream<OUT> {
"The writeAsCsv() method can only be used on data sets of tuples.");
CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
- return writeToFile((FileOutputFormat<OUT>) of, 0L);
+ return write((OutputFormat<OUT>) of, 0L);
}
/**
@@ -1154,7 +1149,7 @@ public class DataStream<OUT> {
"The writeAsCsv() method can only be used on data sets of tuples.");
CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
- return writeToFile((FileOutputFormat<OUT>) of, millis);
+ return write((OutputFormat<OUT>) of, millis);
}
/**
@@ -1179,7 +1174,7 @@ public class DataStream<OUT> {
if (writeMode != null) {
of.setWriteMode(writeMode);
}
- return writeToFile((FileOutputFormat<OUT>) of, 0L);
+ return write((OutputFormat<OUT>) of, 0L);
}
/**
@@ -1208,7 +1203,7 @@ public class DataStream<OUT> {
if (writeMode != null) {
of.setWriteMode(writeMode);
}
- return writeToFile((FileOutputFormat<OUT>) of, millis);
+ return write((OutputFormat<OUT>) of, millis);
}
/**
@@ -1223,35 +1218,26 @@ public class DataStream<OUT> {
* schema for serialization
* @return the closed DataStream
*/
- public DataStreamSink<OUT> writeToSocket(String hostName, int port,
- SerializationSchema<OUT, byte[]> schema) {
+ public DataStreamSink<OUT> writeToSocket(String hostName, int port, SerializationSchema<OUT, byte[]> schema) {
DataStreamSink<OUT> returnStream = addSink(new SocketClientSink<OUT>(hostName, port, schema));
returnStream.setParallelism(1); // It would not work if multiple instances would connect to the same port
return returnStream;
}
-
- public DataStreamSink<OUT> writeToFile(FileOutputFormat<OUT> format, long millis) {
- return addSink(new FileSinkFunctionByMillis<OUT>(format, millis));
- }
/**
- * Writes the dataStream into an output.
+ * Writes the dataStream into an output, described by an OutputFormat.
+ *
* @param format The output format
* @param millis the write frequency
- * @return the closed DataStream
+ * @return The closed DataStream
*/
public DataStreamSink<OUT> write(OutputFormat<OUT> format, long millis) {
return addSink(new FileSinkFunctionByMillis<OUT>(format, millis));
}
protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
-
StreamReduce<OUT> operator = new StreamReduce<OUT>(aggregate);
-
- SingleOutputStreamOperator<OUT, ?> returnStream = transform("Aggregation", getType(),
- operator);
-
- return returnStream;
+ return transform("Aggregation", getType(), operator);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/8e49e7b5/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 3b2183b..b29c6d3 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -50,7 +50,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
def getJavaStream: JavaStream[T] = javaStream
/**
- * Returns the ID of the {@link DataStream}.
+ * Returns the ID of the DataStream.
*
* @return ID of the DataStream
*/
@@ -59,7 +59,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
/**
* Returns the TypeInformation for the elements of this DataStream.
*/
- def getType(): TypeInformation[T] = javaStream.getType
+ def getType(): TypeInformation[T] = javaStream.getType()
/**
* Sets the parallelism of this operation. This must be at least 1.
@@ -108,7 +108,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
/**
* Turns off chaining for this operator so thread co-location will not be
* used as an optimization. </p> Chaining can be turned off for the whole
- * job by {@link StreamExecutionEnvironment#disableOperatorChaining()}
+ * job by [[StreamExecutionEnvironment.disableOperatorChaining()]]
* however it is not advised for performance considerations.
*
*/
@@ -704,16 +704,17 @@ class DataStream[T](javaStream: JavaStream[T]) {
* written.
*
*/
- def print(): DataStream[T] = javaStream.print
+ def print(): DataStream[T] = javaStream.print()
/**
- * Writes a DataStream to the standard output stream (stderr).<br>
+ * Writes a DataStream to the standard output stream (stderr).
+ *
* For each element of the DataStream the result of
- * {@link Object#toString()} is written.
+ * [[AnyRef.toString()]] is written.
*
* @return The closed DataStream.
*/
- def printToErr() = javaStream.printToErr
+ def printToErr() = javaStream.printToErr()
/**
* Writes a DataStream to the file specified by path in text format. The
@@ -743,20 +744,20 @@ class DataStream[T](javaStream: JavaStream[T]) {
if (writeMode != null) {
of.setWriteMode(writeMode)
}
- javaStream.writeToFile(of.asInstanceOf[OutputFormat[T]], millis)
+ javaStream.write(of.asInstanceOf[OutputFormat[T]], millis)
}
/**
* Writes a DataStream using the given [[OutputFormat]]. The
* writing is performed periodically, in every millis milliseconds.
*/
- def writeToFile(format: OutputFormat[T], millis: Long): DataStreamSink[T] = {
- javaStream.writeToFile(format, millis)
+ def write(format: OutputFormat[T], millis: Long): DataStreamSink[T] = {
+ javaStream.write(format, millis)
}
/**
* Writes the DataStream to a socket as a byte array. The format of the output is
- * specified by a {@link SerializationSchema}.
+ * specified by a [[SerializationSchema]].
*/
def writeToSocket(hostname: String, port: Integer, schema: SerializationSchema[T, Array[Byte]]):
DataStream[T] = javaStream.writeToSocket(hostname, port, schema)
[2/2] flink git commit: [hbase] Add a stream data sink for HBase
Posted by se...@apache.org.
[hbase] Add a stream data sink for HBase
This closes #706
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5e55918
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5e55918
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5e55918
Branch: refs/heads/master
Commit: b5e55918c0e6f51df00d2d39548d12f1b7710aa5
Parents: 86692e5
Author: yildirim <yi...@gronau.neofonie.priv>
Authored: Thu May 21 12:28:15 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jun 4 14:27:00 2015 +0200
----------------------------------------------------------------------
flink-staging/flink-hbase/pom.xml | 6 +
.../hbase/example/HBaseWriteStreamExample.java | 114 +++++++++++++++++++
.../streaming/api/datastream/DataStream.java | 26 ++---
3 files changed, 133 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b5e55918/flink-staging/flink-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hbase/pom.xml b/flink-staging/flink-hbase/pom.xml
index 805ff4b..1a5a1db 100644
--- a/flink-staging/flink-hbase/pom.xml
+++ b/flink-staging/flink-hbase/pom.xml
@@ -59,6 +59,12 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${project.version}</version>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/flink/blob/b5e55918/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
new file mode 100644
index 0000000..1e113c0
--- /dev/null
+++ b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.example;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ *
+ * This is an example how to write streams into HBase. In this example the
+ * stream will be written into a local Hbase but it is possible to adapt this
+ * example for an HBase running in a cloud. You need a running local HBase with a
+ * table "flinkExample" and a column "entry". If your HBase configuration does
+ * not fit the hbase-site.xml in the resource folder then you gave to delete temporary this
+ * hbase-site.xml to execute the example properly.
+ *
+ */
+public class HBaseWriteStreamExample {
+
+ public static void main(String[] args) {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .getExecutionEnvironment();
+
+ // data stream with random numbers
+ DataStream<String> dataStream = env.addSource(new SourceFunction<String>() {
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return false;
+ }
+
+ @Override
+ public String next() throws Exception {
+ return String.valueOf(Math.floor(Math.random() * 100));
+ }
+
+ });
+ dataStream.write(new HBaseOutputFormat(), 0L);
+
+ try {
+ env.execute();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ *
+ * This class implements an OutputFormat for HBase
+ *
+ */
+ private static class HBaseOutputFormat implements OutputFormat<String> {
+
+ private org.apache.hadoop.conf.Configuration conf = null;
+ private HTable table = null;
+ private String taskNumber = null;
+ private int rowNumber = 0;
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void configure(Configuration parameters) {
+ conf = HBaseConfiguration.create();
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ table = new HTable(conf, "flinkExample");
+ this.taskNumber = String.valueOf(taskNumber);
+ }
+
+ @Override
+ public void writeRecord(String record) throws IOException {
+ Put put = new Put(Bytes.toBytes(taskNumber + rowNumber));
+ put.add(Bytes.toBytes("entry"), Bytes.toBytes("entry"),
+ Bytes.toBytes(rowNumber));
+ rowNumber++;
+ table.put(put);
+ }
+
+ @Override
+ public void close() throws IOException {
+ table.flushCommits();
+ table.close();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b5e55918/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index e0a8a7a..f4b2088 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -1132,7 +1132,7 @@ public class DataStream<OUT> {
"The writeAsCsv() method can only be used on data sets of tuples.");
CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
- return writeToFile((OutputFormat<OUT>) of, 0L);
+ return writeToFile((FileOutputFormat<OUT>) of, 0L);
}
/**
@@ -1154,7 +1154,7 @@ public class DataStream<OUT> {
"The writeAsCsv() method can only be used on data sets of tuples.");
CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
- return writeToFile((OutputFormat<OUT>) of, millis);
+ return writeToFile((FileOutputFormat<OUT>) of, millis);
}
/**
@@ -1179,7 +1179,7 @@ public class DataStream<OUT> {
if (writeMode != null) {
of.setWriteMode(writeMode);
}
- return writeToFile((OutputFormat<OUT>) of, 0L);
+ return writeToFile((FileOutputFormat<OUT>) of, 0L);
}
/**
@@ -1208,7 +1208,7 @@ public class DataStream<OUT> {
if (writeMode != null) {
of.setWriteMode(writeMode);
}
- return writeToFile((OutputFormat<OUT>) of, millis);
+ return writeToFile((FileOutputFormat<OUT>) of, millis);
}
/**
@@ -1230,18 +1230,18 @@ public class DataStream<OUT> {
return returnStream;
}
+ public DataStreamSink<OUT> writeToFile(FileOutputFormat<OUT> format, long millis) {
+ return addSink(new FileSinkFunctionByMillis<OUT>(format, millis));
+ }
+
/**
- * Writes a DataStream using the given {@link OutputFormat}. The
- * writing is performed periodically, in every millis milliseconds.
- *
- * @param format The output format that should be used for writing.
- * @param millis the file update frequency
- *
+ * Writes the dataStream into an output.
+ * @param format The output format
+ * @param millis the write frequency
* @return the closed DataStream
*/
- public DataStreamSink<OUT> writeToFile(OutputFormat<OUT> format, long millis) {
- DataStreamSink<OUT> returnStream = addSink(new FileSinkFunctionByMillis<OUT>(format, millis));
- return returnStream;
+ public DataStreamSink<OUT> write(OutputFormat<OUT> format, long millis) {
+ return addSink(new FileSinkFunctionByMillis<OUT>(format, millis));
}
protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {