You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/12 14:01:29 UTC

[1/7] flink git commit: [FLINK-785] Fixed ObjectReuseITCase

Repository: flink
Updated Branches:
  refs/heads/master 318a14d85 -> 57615aaa1


[FLINK-785] Fixed ObjectReuseITCase

This closes #370


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57615aaa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57615aaa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57615aaa

Branch: refs/heads/master
Commit: 57615aaa19e9933e43ed0431a78dd231bf98b103
Parents: a3a7350
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Feb 11 18:02:10 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 12 14:00:49 2015 +0200

----------------------------------------------------------------------
 .../flink/test/operators/ObjectReuseITCase.java | 58 ++++++++------------
 1 file changed, 23 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/57615aaa/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java
index fa1d58a..faf6de5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java
@@ -43,18 +43,26 @@ import java.util.List;
 @RunWith(Parameterized.class)
 public class ObjectReuseITCase extends JavaProgramTestBase {
 
-	private static int NUM_PROGRAMS = 3;
+	private static int NUM_PROGRAMS = 4;
 
 	private int curProgId = config.getInteger("ProgramId", -1);
 	private String resultPath;
 	private String expectedResult;
 
+	private static String inReducePath;
+	private static String inGroupReducePath;
+
+	private String IN_REDUCE = "a,1\na,2\na,3\na,4\na,50\n";
+	private String IN_GROUP_REDUCE = "a,1\na,2\na,3\na,4\na,5\n";
+
 	public ObjectReuseITCase(Configuration config) {
 		super(config);
 	}
 	
 	@Override
 	protected void preSubmit() throws Exception {
+		inReducePath = createTempFile("in_reduce.txt", IN_REDUCE);
+		inGroupReducePath = createTempFile("in_group_reduce.txt", IN_GROUP_REDUCE);
 		resultPath = getTempDirPath("result");
 	}
 
@@ -100,13 +108,7 @@ public class ObjectReuseITCase extends JavaProgramTestBase {
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				env.getConfig().enableObjectReuse();
 
-				DataSet<Tuple2<String, Integer>> input = env.fromElements(
-						new Tuple2<String, Integer>("a", 1),
-						new Tuple2<String, Integer>("a", 2),
-						new Tuple2<String, Integer>("a", 3),
-						new Tuple2<String, Integer>("a", 4),
-						new Tuple2<String, Integer>("a", 50));
-
+				DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inReducePath).types(String.class, Integer.class).setParallelism(1);
 				DataSet<Tuple2<String, Integer>> result = input.groupBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
 
 					@Override
@@ -131,24 +133,20 @@ public class ObjectReuseITCase extends JavaProgramTestBase {
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				env.getConfig().enableObjectReuse();
 
-				DataSet<Tuple2<String, Integer>> input = env.fromElements(
-						new Tuple2<String, Integer>("a", 1),
-						new Tuple2<String, Integer>("a", 2),
-						new Tuple2<String, Integer>("a", 3),
-						new Tuple2<String, Integer>("a", 4),
-						new Tuple2<String, Integer>("a", 50));
+				DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inReducePath).types(String.class, Integer.class).setParallelism(1);
 
-				DataSet<Tuple2<String, Integer>> result = input.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
+				DataSet<Tuple2<String, Integer>> result = input
+						.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
 
-					@Override
-					public Tuple2<String, Integer> reduce(
-							Tuple2<String, Integer> value1,
-							Tuple2<String, Integer> value2) throws Exception {
-						value2.f1 += value1.f1;
-						return value2;
-					}
+							@Override
+							public Tuple2<String, Integer> reduce(
+									Tuple2<String, Integer> value1,
+									Tuple2<String, Integer> value2) throws Exception {
+								value2.f1 += value1.f1;
+								return value2;
+							}
 
-				});
+						});
 
 				result.writeAsCsv(resultPath);
 				env.execute();
@@ -163,12 +161,7 @@ public class ObjectReuseITCase extends JavaProgramTestBase {
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				env.getConfig().enableObjectReuse();
 
-				DataSet<Tuple2<String, Integer>> input = env.fromElements(
-						new Tuple2<String, Integer>("a", 1),
-						new Tuple2<String, Integer>("a", 2),
-						new Tuple2<String, Integer>("a", 3),
-						new Tuple2<String, Integer>("a", 4),
-						new Tuple2<String, Integer>("a", 5));
+				DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inGroupReducePath).types(String.class, Integer.class).setParallelism(1);
 
 				DataSet<Tuple2<String, Integer>> result = input.reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
 
@@ -203,12 +196,7 @@ public class ObjectReuseITCase extends JavaProgramTestBase {
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				env.getConfig().enableObjectReuse();
 
-				DataSet<Tuple2<String, Integer>> input = env.fromElements(
-						new Tuple2<String, Integer>("a", 1),
-						new Tuple2<String, Integer>("a", 2),
-						new Tuple2<String, Integer>("a", 3),
-						new Tuple2<String, Integer>("a", 4),
-						new Tuple2<String, Integer>("a", 5));
+				DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inGroupReducePath).types(String.class, Integer.class).setParallelism(1);
 
 				DataSet<Tuple2<String, Integer>> result = input.reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
 


[4/7] flink git commit: [FLINK-2001] [ml] Fix DistanceMetric serialization error

Posted by se...@apache.org.
[FLINK-2001] [ml] Fix DistanceMetric serialization error

This closes #668


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40683718
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40683718
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40683718

Branch: refs/heads/master
Commit: 40683718db2233293ee932317102a399ad60a912
Parents: 318a14d
Author: Chiwan Park <ch...@icloud.com>
Authored: Tue May 12 15:11:25 2015 +0900
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 12 14:00:49 2015 +0200

----------------------------------------------------------------------
 .../ml/metrics/distances/DistanceMetric.scala   |  2 +-
 .../metrics/distances/DistanceMetricSuite.scala | 20 +++++++++++++++++++-
 2 files changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/40683718/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/DistanceMetric.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/DistanceMetric.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/DistanceMetric.scala
index 1297ffb..21573fe 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/DistanceMetric.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/DistanceMetric.scala
@@ -22,7 +22,7 @@ import org.apache.flink.ml.math.Vector
 
 /** DistanceMeasure interface is used for object which determines distance between two points.
   */
-trait DistanceMetric {
+trait DistanceMetric extends Serializable {
   /** Returns the distance between the arguments.
     *
     * @param a a Vector defining a multi-dimensional point in some space

http://git-wip-us.apache.org/repos/asf/flink/blob/40683718/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/metrics/distances/DistanceMetricSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/metrics/distances/DistanceMetricSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/metrics/distances/DistanceMetricSuite.scala
index 569a1a8..1168d7f 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/metrics/distances/DistanceMetricSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/metrics/distances/DistanceMetricSuite.scala
@@ -18,8 +18,10 @@
 
 package org.apache.flink.ml.metrics.distances
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
+
 import org.apache.flink.ml.math.DenseVector
-import org.scalatest.{Matchers, FlatSpec}
+import org.scalatest.{FlatSpec, Matchers}
 
 class DistanceMetricSuite extends FlatSpec with Matchers {
   val EPSILON = 1e-8
@@ -74,4 +76,20 @@ class DistanceMetricSuite extends FlatSpec with Matchers {
 
     TanimotoDistanceMetric().distance(vec1, vec2) should be(1 - (1.0 / (2 + 2 - 1)) +- EPSILON)
   }
+
+  it should "be serialized" in {
+    val metric = EuclideanDistanceMetric()
+    val byteOutput = new ByteArrayOutputStream()
+    val output = new ObjectOutputStream(byteOutput)
+
+    output.writeObject(metric)
+    output.close()
+
+    val byteInput = new ByteArrayInputStream(byteOutput.toByteArray)
+    val input = new ObjectInputStream(byteInput)
+
+    val restoredMetric = input.readObject().asInstanceOf[DistanceMetric]
+
+    restoredMetric should be(an[EuclideanDistanceMetric])
+  }
 }


[7/7] flink git commit: [FLINK-1928] [hbase] Fix license header for HBase tests

Posted by se...@apache.org.
[FLINK-1928] [hbase] Fix license header for HBase tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a771cfb6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a771cfb6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a771cfb6

Branch: refs/heads/master
Commit: a771cfb6d66a433fa362f32fe35b3085754eb282
Parents: 3be621c
Author: Stephan Ewen <se...@apache.org>
Authored: Tue May 12 11:51:16 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 12 14:00:49 2015 +0200

----------------------------------------------------------------------
 .../hbase/example/HBaseFlinkTestConstants.java    | 18 ++++++++++++++++++
 1 file changed, 18 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a771cfb6/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
index 5881020..8579dee 100644
--- a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
+++ b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.addons.hbase.example;
 
 public class HBaseFlinkTestConstants {


[3/7] flink git commit: [FLINK-1987][docs] Fixed broken links

Posted by se...@apache.org.
[FLINK-1987][docs] Fixed broken links

This closes #662


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cafb8769
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cafb8769
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cafb8769

Branch: refs/heads/master
Commit: cafb8769a22e21c1c6fe045670ed968bb1293f77
Parents: 4068371
Author: andralungu <lu...@gmail.com>
Authored: Thu May 7 22:08:00 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 12 14:00:49 2015 +0200

----------------------------------------------------------------------
 docs/internals/add_operator.md | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cafb8769/docs/internals/add_operator.md
----------------------------------------------------------------------
diff --git a/docs/internals/add_operator.md b/docs/internals/add_operator.md
index 417178e..241304d 100644
--- a/docs/internals/add_operator.md
+++ b/docs/internals/add_operator.md
@@ -80,7 +80,7 @@ public static <T>DataSet<Long> count(DataSet<T> data) {
 
 A more complex example of an operation via specialization is the {% gh_link /flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java "Aggregation Operation" %} in the Java API. It is implemented by means of a *GroupReduce* UDF.
 
-The Aggregate Operation comes with its own operator in the *Java API*, but translates itself into a {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java "GroupReduceOperatorBase" %} in the *Common API*. (see [Program Life Cycle](internal_program_life_cycle.html) for details of how an operation from the *Java API* becomes an operation of the *Common API* and finally a runtime operation.)
+The Aggregate Operation comes with its own operator in the *Java API*, but translates itself into a {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java "GroupReduceOperatorBase" %} in the *Common API*.
 The Java API aggregation operator is only a builder that takes the types of aggregations and the field positions, and used that information to
 parameterize the GroupReduce UDF that performs the aggregations.
 
@@ -94,6 +94,7 @@ The DataSet offers a method for custom operators: `DataSet<X> runOperation(Custo
 The *CustomUnaryOperation* interface defines operators by means of the two functions:
 
 ~~~ java
+
 void setInput(DataSet<IN> inputData);
 	
 DataSet<OUT> createResult();
@@ -166,7 +167,7 @@ To learn how to implement a chained operator, take a look at the {% gh_link /fli
 
 ### Optimizer/Compiler
 
-This section does a minimal discussion of the important steps to add an operator. Please see the [Optimizer](internal_optimizer.html) docs for more detail on how the optimizer works.
+This section does a minimal discussion of the important steps to add an operator. Please see the {% gh_link /flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java "Optimizer" %} for more details on how the optimizer works.
 To allow the optimizer to include a new operator in its planning, it needs a bit of information about it; in particular, the following information:
 
 - *{% gh_link /flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java "DriverStrategy" %}*: The operation needs to be added to the Enum, to make it available to the optimizer. The parameters to the Enum entry define which class implements the runtime operator, its chained version, whether the operator accumulates records (and needs memory for that), and whether it requires a comparator (works on keys). For our example, we can add the entry
@@ -174,10 +175,10 @@ To allow the optimizer to include a new operator in its planning, it needs a bit
 MAP_PARTITION(MapPartitionDriver.class, null /* or chained variant */, PIPELINED, false)
 ~~~
 
-- *Cost function*: The class {% gh_link /flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java "CostEstimator" %} needs to know how expensive the operation is to the system. The costs here refer to the non-UDF part of the operator. Since the operator does essentially no work (it forwards the record stream to the UDF), the costs are zero. We change the `costOperator(...)` method by adding the *MAP_PARTITION* constant to the switch statement similar to the *MAP* constant such that no cost is accounted for it.
+- *Cost function*: The class {% gh_link /flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java "CostEstimator" %} needs to know how expensive the operation is to the system. The costs here refer to the non-UDF part of the operator. Since the operator does essentially no work (it forwards the record stream to the UDF), the costs are zero. We change the `costOperator(...)` method by adding the *MAP_PARTITION* constant to the switch statement similar to the *MAP* constant such that no cost is accounted for it.
 
-- *OperatorDescriptor*: The operator descriptors define how an operation needs to be treated by the optimizer. They describe how the operation requires the input data to be (e.g., sorted or partitioned) and that way allows the optimizer to optimize the data movement, sorting, grouping in a global fashion. They do that by describing which {% gh_link /flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java "RequestedGlobalProperties" %} (partitioning, replication, etc) and which {% gh_link /flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java "RequestedLocalProperties" %} (sorting, grouping, uniqueness) the operator has, as well as how the operator affects the existing {% gh_link /flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java "GlobalProperties" %} and {% gh_link /flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.j
 ava "LocalProperties" %}. In addition, it defines a few utility methods, for example to instantiate an operator candidate.
-Since the *mapPartition()* function is very simple (no requirements on partitioning/grouping), the descriptor is very simple. Other operators have more complex requirements, for example the {% gh_link /flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceProperties.java "GroupReduce" %}. Some operators, like *join* have multiple ways in which they can be executed and therefore have multiple descriptors ({% gh_link /flink-compiler/src/main/java/org/apache/flink/compiler/operators/HashJoinBuildFirstProperties.java "Hash Join 1" %}, {% gh_link /flink-compiler/src/main/java/org/apache/flink/compiler/operators/HashJoinBuildSecondProperties.java "Hash Join 2" %}, {% gh_link /flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java "SortMerge Join" %}).
+- *OperatorDescriptor*: The operator descriptors define how an operation needs to be treated by the optimizer. They describe how the operation requires the input data to be (e.g., sorted or partitioned) and that way allows the optimizer to optimize the data movement, sorting, grouping in a global fashion. They do that by describing which {% gh_link /flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java "RequestedGlobalProperties" %} (partitioning, replication, etc) and which {% gh_link /flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedLocalProperties.java "RequestedLocalProperties" %} (sorting, grouping, uniqueness) the operator has, as well as how the operator affects the existing {% gh_link /flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java "GlobalProperties" %} and {% gh_link /flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProp
 erties.java "LocalProperties" %}. In addition, it defines a few utility methods, for example to instantiate an operator candidate.
+Since the *mapPartition()* function is very simple (no requirements on partitioning/grouping), the descriptor is very simple. Other operators have more complex requirements, for example the {% gh_link /flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java "Hash Join 1" %}, {% gh_link /flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java "Hash Join 2" %}, {% gh_link /flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java "SortMerge Join" %}).
 The code sample below explains (with comments) how to create a descriptor for the *MapPartitionOperator* 
 
   ~~~ java
@@ -214,7 +215,7 @@ The code sample below explains (with comments) how to create a descriptor for th
     }
   ~~~
 
-- *OptimizerNode*: The optimizer node is the place where all comes together. It creates the list of *OperatorDescriptors*, implements the result data set size estimation, and assigns a name to the operation. It is a relatively small class and can be more or less copied again from the {% gh_link /flink-compiler/src/main/java/org/apache/flink/compiler/dag/MapNode.java "MapNode" %}.
+- *OptimizerNode*: The optimizer node is the place where all comes together. It creates the list of *OperatorDescriptors*, implements the result data set size estimation, and assigns a name to the operation. It is a relatively small class and can be more or less copied again from the {% gh_link /flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapNode.java "MapNode" %}.
 
 
 ### Common API
@@ -223,7 +224,7 @@ To make the operation available to the higher-level APIs, it needs to be added t
 base operator. Create a class `MapPartitionOperatorBase`, after the pattern of the {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java "MapOperatorBase" %}.
 
 In addition, the optimizer needs to know which OptimizerNode how to create an OptimizerNode from the OperatorBase. This happens in the class
-`GraphCreatingVisitor` in the {% gh_link /flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java "Optimizer" %}.
+`GraphCreatingVisitor` in the {% gh_link /flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java "Optimizer" %}.
 
 *Note:* A pending idea is to allow to skip this step by unifying the OptimizerNode and the Common API operator. They essentially fulfill the
 same function. The Common API operator exists only in order for the `flink-java` and `flink-scala` packages to not have a dependency on the


[2/7] flink git commit: [runtime] Improve error messages on Task deployment

Posted by se...@apache.org.
[runtime] Improve error messages on Task deployment

This closes #615


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b4152d75
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b4152d75
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b4152d75

Branch: refs/heads/master
Commit: b4152d75bf5236fbb3853b673af8e17f73cef5e2
Parents: a771cfb
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Apr 21 19:05:34 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 12 14:00:49 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/executiongraph/Execution.java   | 11 ++++++++---
 .../apache/flink/runtime/taskmanager/TaskManager.scala   |  2 ++
 2 files changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b4152d75/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 93b4f2f..4e046dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import scala.concurrent.Future;
@@ -330,7 +331,7 @@ public class Execution implements Serializable {
 			vertex.getExecutionGraph().registerExecution(this);
 
 			final Instance instance = slot.getInstance();
-			Future<Object> deployAction = Patterns.ask(instance.getTaskManager(),
+			final Future<Object> deployAction = Patterns.ask(instance.getTaskManager(),
 					new SubmitTask(deployment), new Timeout(timeout));
 
 			deployAction.onComplete(new OnComplete<Object>(){
@@ -339,9 +340,13 @@ public class Execution implements Serializable {
 				public void onComplete(Throwable failure, Object success) throws Throwable {
 					if (failure != null) {
 						if (failure instanceof TimeoutException) {
+							String taskname = Task.getTaskNameWithSubtaskAndID(deployment.getTaskName(),
+									deployment.getIndexInSubtaskGroup(), deployment.getNumberOfSubtasks(),
+									attemptId);
+							
 							markFailed(new Exception(
-									"Cannot deploy task - TaskManager " + instance + " not responding.",
-									failure));
+									"Cannot deploy task " + taskname + " - TaskManager (" + instance
+											+ ") not responding after a timeout of " + timeout, failure));
 						}
 						else {
 							markFailed(failure);

http://git-wip-us.apache.org/repos/asf/flink/blob/b4152d75/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index bdefea6..2e580cc 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -771,6 +771,8 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
       val execId = tdd.getExecutionId
       val task = new Task(tdd, memoryManager, ioManager, network, bcVarManager,
                           self, jobManagerActor, config.timeout, libCache, fileCache)
+
+      log.info(s"Received task ${task.getTaskNameWithSubtasks}")
       
       // add the task to the map
       val prevTask = runningTasks.put(execId, task)


[6/7] flink git commit: [FLINK-785] ChainedAllReduce

Posted by se...@apache.org.
[FLINK-785] ChainedAllReduce


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3a7350d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3a7350d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3a7350d

Branch: refs/heads/master
Commit: a3a7350d55a3e9b20a6c53aedd8d1c24fb188122
Parents: b4152d7
Author: zentol <s....@web.de>
Authored: Wed Feb 11 16:02:15 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 12 14:00:49 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/operators/DriverStrategy.java |   3 +-
 .../chaining/ChainedAllReduceDriver.java        | 112 +++++++++++++++++++
 2 files changed, 114 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a3a7350d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index 7942b3b..4a0035c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators;
 import static org.apache.flink.runtime.operators.DamBehavior.FULL_DAM;
 import static org.apache.flink.runtime.operators.DamBehavior.MATERIALIZING;
 import static org.apache.flink.runtime.operators.DamBehavior.PIPELINED;
+import org.apache.flink.runtime.operators.chaining.ChainedAllReduceDriver;
 
 import org.apache.flink.runtime.operators.chaining.ChainedCollectorMapDriver;
 import org.apache.flink.runtime.operators.chaining.ChainedDriver;
@@ -51,7 +52,7 @@ public enum DriverStrategy {
 	FLAT_MAP(FlatMapDriver.class, ChainedFlatMapDriver.class, PIPELINED, 0),
 
 	// group everything together into one group and apply the Reduce function
-	ALL_REDUCE(AllReduceDriver.class, null, PIPELINED, 0),
+	ALL_REDUCE(AllReduceDriver.class, ChainedAllReduceDriver.class, PIPELINED, 0),
 	// group everything together into one group and apply the GroupReduce function
 	ALL_GROUP_REDUCE(AllGroupReduceDriver.class, null, PIPELINED, 0),
 	// group everything together into one group and apply the GroupReduce's combine function

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a7350d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
new file mode 100644
index 0000000..4641fce
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.RegularPactTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ChainedAllReduceDriver<IT> extends ChainedDriver<IT, IT> {
+	private static final Logger LOG = LoggerFactory.getLogger(ChainedAllReduceDriver.class);
+
+	// --------------------------------------------------------------------------------------------
+	private ReduceFunction<IT> reducer;
+	private TypeSerializer<IT> serializer;
+
+	private IT base;
+
+	// --------------------------------------------------------------------------------------------
+	@Override
+	public void setup(AbstractInvokable parent) {
+		@SuppressWarnings("unchecked")
+		final ReduceFunction<IT> red = RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, ReduceFunction.class);
+		this.reducer = red;
+		FunctionUtils.setFunctionRuntimeContext(red, getUdfRuntimeContext());
+
+		TypeSerializerFactory<IT> serializerFactory = this.config.getInputSerializer(0, userCodeClassLoader);
+		this.serializer = serializerFactory.getSerializer();
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("ChainedAllReduceDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
+		}
+	}
+
+	@Override
+	public void openTask() throws Exception {
+		Configuration stubConfig = this.config.getStubParameters();
+		RegularPactTask.openUserCode(this.reducer, stubConfig);
+	}
+
+	@Override
+	public void closeTask() throws Exception {
+		RegularPactTask.closeUserCode(this.reducer);
+	}
+
+	@Override
+	public void cancelTask() {
+		try {
+			FunctionUtils.closeFunction(this.reducer);
+		} catch (Throwable t) {
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	@Override
+	public Function getStub() {
+		return this.reducer;
+	}
+
+	@Override
+	public String getTaskName() {
+		return this.taskName;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	@Override
+	public void collect(IT record) {
+		try {
+			if (base == null) {
+				base = objectReuseEnabled ? record : serializer.copy(record);
+			} else {
+				base = objectReuseEnabled ? reducer.reduce(base, record) : serializer.copy(reducer.reduce(base, record));
+			}
+		} catch (Exception e) {
+			throw new ExceptionInChainedStubException(taskName, e);
+		}
+	}
+
+	@Override
+	public void close() {
+		try {
+			if (base != null) {
+				this.outputCollector.collect(base);
+				base = null;
+			}
+		} catch (Exception e) {
+			throw new ExceptionInChainedStubException(this.taskName, e);
+		}
+		this.outputCollector.close();
+	}
+}


[5/7] flink git commit: [FLINK-1928] [hbase] Added HBase write example

Posted by se...@apache.org.
[FLINK-1928] [hbase] Added HBase write example


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3be621cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3be621cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3be621cf

Branch: refs/heads/master
Commit: 3be621cfc270e7aee24b0a5438acbbe191606aea
Parents: cafb876
Author: fpompermaier <f....@gmail.com>
Authored: Mon Apr 27 17:15:18 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 12 14:00:49 2015 +0200

----------------------------------------------------------------------
 flink-staging/flink-hbase/pom.xml               |  38 +++-
 .../hbase/example/HBaseFlinkTestConstants.java  |  10 +
 .../addons/hbase/example/HBaseReadExample.java  |  11 +-
 .../addons/hbase/example/HBaseWriteExample.java | 202 +++++++++++++++++++
 4 files changed, 251 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3be621cf/flink-staging/flink-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hbase/pom.xml b/flink-staging/flink-hbase/pom.xml
index f8c16f3..805ff4b 100644
--- a/flink-staging/flink-hbase/pom.xml
+++ b/flink-staging/flink-hbase/pom.xml
@@ -34,8 +34,8 @@ under the License.
 	<packaging>jar</packaging>
 
 	<properties>
-		<hbase.hadoop1.version>0.98.6.1-hadoop1</hbase.hadoop1.version>
-		<hbase.hadoop2.version>0.98.6.1-hadoop2</hbase.hadoop2.version>
+		<hbase.hadoop1.version>0.98.11-hadoop1</hbase.hadoop1.version>
+		<hbase.hadoop2.version>0.98.11-hadoop2</hbase.hadoop2.version>
 	</properties>
 
 	<dependencies>
@@ -82,10 +82,12 @@ under the License.
 				</exclusion>
 			</exclusions>
 		</dependency>
-		
+
+		<!-- HBase server needed for TableOutputFormat -->
+		<!-- TODO implement bulk output format for HBase -->
 		<dependency>
 			<groupId>org.apache.hbase</groupId>
-			<artifactId>hbase-client</artifactId>
+			<artifactId>hbase-server</artifactId>
 			<version>${hbase.version}</version>
 			<exclusions>
 				<!-- Remove unneeded dependency, which is conflicting with our jetty-util version. -->
@@ -93,6 +95,26 @@ under the License.
 					<groupId>org.mortbay.jetty</groupId>
 					<artifactId>jetty-util</artifactId>
 				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-sslengine</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jsp-2.1</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jsp-api-2.1</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>servlet-api-2.5</artifactId>
+				</exclusion>
 				<!-- The hadoop dependencies are handled through flink-shaded-hadoop -->
 				<exclusion>
 					<groupId>org.apache.hadoop</groupId>
@@ -110,6 +132,14 @@ under the License.
 					<groupId>org.apache.hadoop</groupId>
 					<artifactId>hadoop-mapreduce-client-core</artifactId>
 				</exclusion>
+				<exclusion>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-client</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-hdfs</artifactId>
+				</exclusion>
 			</exclusions>
 		</dependency>
 	</dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/3be621cf/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
new file mode 100644
index 0000000..5881020
--- /dev/null
+++ b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
@@ -0,0 +1,10 @@
+package org.apache.flink.addons.hbase.example;
+
+public class HBaseFlinkTestConstants {
+	
+	public static final byte[] CF_SOME = "someCf".getBytes();
+	public static final byte[] Q_SOME = "someQual".getBytes();
+	public static final String TEST_TABLE_NAME = "test-table";
+	public static final String TMP_DIR = "/tmp/test";
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be621cf/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
index b6f345a..dccf876 100644
--- a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
+++ b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * Simple stub for HBase DataSet
+ * Simple stub for HBase DataSet read
  * 
  * To run the test first create the test table with hbase shell.
  * 
@@ -47,17 +47,16 @@ public class HBaseReadExample {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		@SuppressWarnings("serial")
 		DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new TableInputFormat<Tuple2<String, String>>() {
-			private final byte[] CF_SOME = "someCf".getBytes();
-			private final byte[] Q_SOME = "someQual".getBytes();
+			
 				@Override
 				public String getTableName() {
-					return "test-table";
+					return HBaseFlinkTestConstants.TEST_TABLE_NAME;
 				}
 
 				@Override
 				protected Scan getScanner() {
 					Scan scan = new Scan();
-					scan.addColumn(CF_SOME, Q_SOME);
+					scan.addColumn(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME);
 					return scan;
 				}
 
@@ -66,7 +65,7 @@ public class HBaseReadExample {
 				@Override
 				protected Tuple2<String, String> mapResultToTuple(Result r) {
 					String key = Bytes.toString(r.getRow());
-					String val = Bytes.toString(r.getValue(CF_SOME, Q_SOME));
+					String val = Bytes.toString(r.getValue(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME));
 					reuse.setField(key, 0);
 					reuse.setField(val, 1);
 					return reuse;

http://git-wip-us.apache.org/repos/asf/flink/blob/3be621cf/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
new file mode 100644
index 0000000..483bdff
--- /dev/null
+++ b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Simple stub for HBase DataSet write
+ * 
+ * To run the test first create the test table with hbase shell.
+ * 
+ * Use the following commands:
+ * <ul>
+ *     <li>create 'test-table', 'someCf'</li>
+ * </ul>
+ * 
+ */
+@SuppressWarnings("serial")
+public class HBaseWriteExample {
+	
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+	
+	public static void main(String[] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+		
+		// set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// get input data
+		DataSet<String> text = getTextDataSet(env);
+		
+		DataSet<Tuple2<String, Integer>> counts = 
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				text.flatMap(new Tokenizer())
+				// group by the tuple field "0" and sum up tuple field "1"
+				.groupBy(0)
+				.sum(1);
+
+		// emit result
+		Job job = Job.getInstance();
+		job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
+		// TODO is "mapred.output.dir" really useful?
+		job.getConfiguration().set("mapred.output.dir",HBaseFlinkTestConstants.TMP_DIR);
+		counts.map(new RichMapFunction <Tuple2<String,Integer>, Tuple2<Text,Mutation>>() {
+			private transient Tuple2<Text, Mutation> reuse;
+
+			@Override
+			public void open(Configuration parameters) throws Exception {
+				super.open(parameters);
+				reuse = new Tuple2<Text, Mutation>();
+			}
+
+			@Override
+			public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception {
+				reuse.f0 = new Text(t.f0);
+				Put put = new Put(t.f0.getBytes());
+				put.add(HBaseFlinkTestConstants.CF_SOME,HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
+				reuse.f1 = put;
+				return reuse;
+			}
+		}).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job));
+		
+		// execute program
+		env.execute("WordCount (HBase sink) Example");
+	}
+	
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+	
+	/**
+	 * Implements the string tokenizer that splits sentences into words as a user-defined
+	 * FlatMapFunction. The function takes a line (String) and splits it into 
+	 * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+	 */
+	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+
+		@Override
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+			
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Tuple2<String, Integer>(token, 1));
+				}
+			}
+		}
+	}
+	
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputTableName = HBaseFlinkTestConstants.TEST_TABLE_NAME;
+	
+	private static boolean parseParameters(String[] args) {
+		
+		if(args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if(args.length == 2) {
+				textPath = args[0];
+				outputTableName = args[1];
+			} else {
+				System.err.println("Usage: HBaseWriteExample <text path> <output table>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing HBaseWriteExample example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  Usage: HBaseWriteExample <text path> <output table>");
+		}
+		return true;
+	}
+	
+	private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		} else {
+			// get default test text data
+			return getDefaultTextLineDataSet(env);
+		}
+	}
+	private static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) {
+		return env.fromElements(WORDS);
+	}
+	private static final String[] WORDS = new String[] {
+		"To be, or not to be,--that is the question:--",
+		"Whether 'tis nobler in the mind to suffer",
+		"The slings and arrows of outrageous fortune",
+		"Or to take arms against a sea of troubles,",
+		"And by opposing end them?--To die,--to sleep,--",
+		"No more; and by a sleep to say we end",
+		"The heartache, and the thousand natural shocks",
+		"That flesh is heir to,--'tis a consummation",
+		"Devoutly to be wish'd. To die,--to sleep;--",
+		"To sleep! perchance to dream:--ay, there's the rub;",
+		"For in that sleep of death what dreams may come,",
+		"When we have shuffled off this mortal coil,",
+		"Must give us pause: there's the respect",
+		"That makes calamity of so long life;",
+		"For who would bear the whips and scorns of time,",
+		"The oppressor's wrong, the proud man's contumely,",
+		"The pangs of despis'd love, the law's delay,",
+		"The insolence of office, and the spurns",
+		"That patient merit of the unworthy takes,",
+		"When he himself might his quietus make",
+		"With a bare bodkin? who would these fardels bear,",
+		"To grunt and sweat under a weary life,",
+		"But that the dread of something after death,--",
+		"The undiscover'd country, from whose bourn",
+		"No traveller returns,--puzzles the will,",
+		"And makes us rather bear those ills we have",
+		"Than fly to others that we know not of?",
+		"Thus conscience does make cowards of us all;",
+		"And thus the native hue of resolution",
+		"Is sicklied o'er with the pale cast of thought;",
+		"And enterprises of great pith and moment,",
+		"With this regard, their currents turn awry,",
+		"And lose the name of action.--Soft you now!",
+		"The fair Ophelia!--Nymph, in thy orisons",
+		"Be all my sins remember'd."
+	};
+}
\ No newline at end of file