You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/06/18 21:40:40 UTC

[1/3] flink git commit: [FLINK-3641] Add documentation for DataSet distributed cache.

Repository: flink
Updated Branches:
  refs/heads/master bce355093 -> ba62df14a


[FLINK-3641] Add documentation for DataSet distributed cache.

This closes #2122


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba62df14
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba62df14
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba62df14

Branch: refs/heads/master
Commit: ba62df14a52660cec85783b1070821acd144fd06
Parents: 5a0c268
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Jun 15 14:19:43 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat Jun 18 23:40:23 2016 +0200

----------------------------------------------------------------------
 docs/apis/batch/index.md | 106 +++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 105 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba62df14/docs/apis/batch/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/index.md b/docs/apis/batch/index.md
index 993fb72..42c6c92 100644
--- a/docs/apis/batch/index.md
+++ b/docs/apis/batch/index.md
@@ -2019,6 +2019,110 @@ of a function, or use the `withParameters(...)` method to pass in a configuratio
 
 {% top %}
 
+Distributed Cache
+-------------------
+
+Flink offers a distributed cache, similar to Apache Hadoop, to make files locally accessible to parallel instances of user functions. This functionality can be used to share files that contain static external data such as dictionaries or machine-learned regression models.
+
+The cache works as follows. A program registers a file or directory of a [local or remote filesystem such as HDFS or S3]({{ site.baseurl }}/apis/batch/connectors.html#reading-from-file-systems) under a specific name in its `ExecutionEnvironment` as a cached file. When the program is executed, Flink automatically copies the file or directory to the local filesystem of all workers. A user function can look up the file or directory under the specified name and access it from the worker's local filesystem. 
+
+The distributed cache is used as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+Register the file or directory in the `ExecutionEnvironment`.
+
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+// register a file from HDFS
+env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
+
+// register a local executable file (script, executable, ...)
+env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
+
+// define your program and execute
+...
+DataSet<String> input = ...
+DataSet<Integer> result = input.map(new MyMapper());
+...
+env.execute();
+{% endhighlight %}
+
+Access the cached file or directory in a user function (here a `MapFunction`). The function must extend a [RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) class because it needs access to the `RuntimeContext`.
+
+{% highlight java %}
+
+// extend a RichFunction to have access to the RuntimeContext
+public final class MyMapper extends RichMapFunction<String, Integer> {
+    
+    @Override
+    public void open(Configuration config) {
+      
+      // access cached file via RuntimeContext and DistributedCache
+      File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
+      // read the file (or navigate the directory)
+      ...
+    }
+
+    @Override
+    public Integer map(String value) throws Exception {
+      // use content of cached file
+      ...
+    }
+}
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
+
+Register the file or directory in the `ExecutionEnvironment`.
+
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+// register a file from HDFS
+env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
+
+// register a local executable file (script, executable, ...)
+env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
+
+// define your program and execute
+...
+val input: DataSet[String] = ...
+val result: DataSet[Integer] = input.map(new MyMapper())
+...
+env.execute()
+{% endhighlight %}
+
+Access the cached file in a user function (here a `MapFunction`). The function must extend a [RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) class because it needs access to the `RuntimeContext`.
+
+{% highlight scala %}
+
+// extend a RichFunction to have access to the RuntimeContext
+class MyMapper extends RichMapFunction[String, Int] {
+
+  override def open(config: Configuration): Unit = {
+
+    // access cached file via RuntimeContext and DistributedCache
+    val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile")
+    // read the file (or navigate the directory)
+    ...
+  }
+
+  override def map(value: String): Int = {
+    // use content of cached file
+    ...
+  }
+}
+{% endhighlight %}
+
+</div>
+</div>
+
+{% top %}
+
 Passing Parameters to Functions
 -------------------
 
@@ -2067,7 +2171,7 @@ class MyFilter(limit: Int) extends FilterFunction[Int] {
 
 #### Via `withParameters(Configuration)`
 
-This method takes a Configuration object as an argument, which will be passed to the [rich function](#rich-functions)'s `open()`
+This method takes a Configuration object as an argument, which will be passed to the [rich function]({{ site.baseurl }}/apis/common/index.html#rich-functions)'s `open()`
 method. The Configuration object is a Map from String keys to different value types.
 
 <div class="codetabs" markdown="1">


[3/3] flink git commit: [FLINK-3977] [dataStream] InternalWindowFunctions implement OutputTypeConfigurable.

Posted by fh...@apache.org.
[FLINK-3977] [dataStream] InternalWindowFunctions implement OutputTypeConfigurable.

- setOutputType calls are forwarded to wrapped functions.
- test added for InternalWindowFucntions.

This closes #2118


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/18744b2c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/18744b2c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/18744b2c

Branch: refs/heads/master
Commit: 18744b2c846aa51ed317c4c7409519f25e3eafb7
Parents: bce3550
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Jun 16 09:09:40 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat Jun 18 23:40:23 2016 +0200

----------------------------------------------------------------------
 .../InternalIterableAllWindowFunction.java      |  11 +
 .../InternalIterableWindowFunction.java         |  11 +
 .../InternalSingleValueAllWindowFunction.java   |  11 +
 .../InternalSingleValueWindowFunction.java      |  11 +
 .../functions/InternalWindowFunction.java       |   5 +-
 .../functions/InternalWindowFunctionTest.java   | 225 +++++++++++++++++++
 6 files changed, 273 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/18744b2c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
index 3a4be91..522d3ec 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
@@ -17,12 +17,15 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing.functions;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -72,4 +75,12 @@ public final class InternalIterableAllWindowFunction<IN, OUT, W extends Window>
 		throw new RuntimeException("This should never be called.");
 
 	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
+		if (OutputTypeConfigurable.class.isAssignableFrom(this.wrappedFunction.getClass())) {
+			((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, executionConfig);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/18744b2c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
index 822a57c..2598557 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
@@ -17,12 +17,15 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing.functions;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -72,4 +75,12 @@ public final class InternalIterableWindowFunction<IN, OUT, KEY, W extends Window
 		throw new RuntimeException("This should never be called.");
 
 	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
+		if (OutputTypeConfigurable.class.isAssignableFrom(this.wrappedFunction.getClass())) {
+			((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, executionConfig);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/18744b2c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
index aa6e196..6db711f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
@@ -17,12 +17,15 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing.functions;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -74,4 +77,12 @@ public final class InternalSingleValueAllWindowFunction<IN, OUT, W extends Windo
 		throw new RuntimeException("This should never be called.");
 
 	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
+		if (OutputTypeConfigurable.class.isAssignableFrom(this.wrappedFunction.getClass())) {
+			((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, executionConfig);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/18744b2c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
index 661473d..727f2e4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
@@ -17,12 +17,15 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing.functions;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -74,4 +77,12 @@ public final class InternalSingleValueWindowFunction<IN, OUT, KEY, W extends Win
 		throw new RuntimeException("This should never be called.");
 
 	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
+		if (OutputTypeConfigurable.class.isAssignableFrom(this.wrappedFunction.getClass())) {
+			((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, executionConfig);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/18744b2c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
index e75f3be..a7d18de 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.operators.windowing.functions;
 
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -30,7 +31,9 @@ import java.io.Serializable;
  * @param <OUT> The type of the output value.
  * @param <KEY> The type of the key.
  */
-public abstract class InternalWindowFunction<IN, OUT, KEY, W extends Window> implements Function, Serializable {
+public abstract class InternalWindowFunction<IN, OUT, KEY, W extends Window>
+		implements Function, Serializable, OutputTypeConfigurable<OUT> {
+
 	private static final long serialVersionUID = 1L;
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/18744b2c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
new file mode 100644
index 0000000..fae2cd0
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.windowing.functions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
+import org.apache.flink.util.Collector;
+import org.hamcrest.collection.IsIterableContainingInOrder;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+public class InternalWindowFunctionTest {
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testInternalIterableAllWindowFunction() throws Exception {
+
+		AllWindowFunctionMock mock = mock(AllWindowFunctionMock.class);
+		InternalIterableAllWindowFunction<Long, String, TimeWindow> windowFunction =
+			new InternalIterableAllWindowFunction<>(mock);
+
+		// check setOutputType
+		TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
+		ExecutionConfig execConf = new ExecutionConfig();
+		execConf.setParallelism(42);
+
+		windowFunction.setOutputType(stringType, execConf);
+		verify(mock).setOutputType(stringType, execConf);
+
+		// check open
+		Configuration config = new Configuration();
+
+		windowFunction.open(config);
+		verify(mock).open(config);
+
+		// check setRuntimeContext
+		RuntimeContext rCtx = mock(RuntimeContext.class);
+
+		windowFunction.setRuntimeContext(rCtx);
+		verify(mock).setRuntimeContext(rCtx);
+
+		// check apply
+		TimeWindow w = mock(TimeWindow.class);
+		Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
+		Collector<String> c = (Collector<String>) mock(Collector.class);
+
+		windowFunction.apply(((byte)0), w, i, c);
+		verify(mock).apply(w, i, c);
+
+		// check close
+		windowFunction.close();
+		verify(mock).close();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testInternalIterableWindowFunction() throws Exception {
+
+		WindowFunctionMock mock = mock(WindowFunctionMock.class);
+		InternalIterableWindowFunction<Long, String, Long, TimeWindow> windowFunction =
+			new InternalIterableWindowFunction<>(mock);
+
+		// check setOutputType
+		TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
+		ExecutionConfig execConf = new ExecutionConfig();
+		execConf.setParallelism(42);
+
+		windowFunction.setOutputType(stringType, execConf);
+		verify(mock).setOutputType(stringType, execConf);
+
+		// check open
+		Configuration config = new Configuration();
+
+		windowFunction.open(config);
+		verify(mock).open(config);
+
+		// check setRuntimeContext
+		RuntimeContext rCtx = mock(RuntimeContext.class);
+
+		windowFunction.setRuntimeContext(rCtx);
+		verify(mock).setRuntimeContext(rCtx);
+
+		// check apply
+		TimeWindow w = mock(TimeWindow.class);
+		Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
+		Collector<String> c = (Collector<String>) mock(Collector.class);
+
+		windowFunction.apply(42L, w, i, c);
+		verify(mock).apply(42L, w, i, c);
+
+		// check close
+		windowFunction.close();
+		verify(mock).close();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testInternalSingleValueWindowFunction() throws Exception {
+
+		WindowFunctionMock mock = mock(WindowFunctionMock.class);
+		InternalSingleValueWindowFunction<Long, String, Long, TimeWindow> windowFunction =
+			new InternalSingleValueWindowFunction<>(mock);
+
+		// check setOutputType
+		TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
+		ExecutionConfig execConf = new ExecutionConfig();
+		execConf.setParallelism(42);
+
+		windowFunction.setOutputType(stringType, execConf);
+		verify(mock).setOutputType(stringType, execConf);
+
+		// check open
+		Configuration config = new Configuration();
+
+		windowFunction.open(config);
+		verify(mock).open(config);
+
+		// check setRuntimeContext
+		RuntimeContext rCtx = mock(RuntimeContext.class);
+
+		windowFunction.setRuntimeContext(rCtx);
+		verify(mock).setRuntimeContext(rCtx);
+
+		// check apply
+		TimeWindow w = mock(TimeWindow.class);
+		Collector<String> c = (Collector<String>) mock(Collector.class);
+
+		windowFunction.apply(42L, w, 23L, c);
+		verify(mock).apply(eq(42L), eq(w), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
+
+		// check close
+		windowFunction.close();
+		verify(mock).close();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testInternalSingleValueAllWindowFunction() throws Exception {
+
+		AllWindowFunctionMock mock = mock(AllWindowFunctionMock.class);
+		InternalSingleValueAllWindowFunction<Long, String, TimeWindow> windowFunction =
+			new InternalSingleValueAllWindowFunction<>(mock);
+
+		// check setOutputType
+		TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
+		ExecutionConfig execConf = new ExecutionConfig();
+		execConf.setParallelism(42);
+
+		windowFunction.setOutputType(stringType, execConf);
+		verify(mock).setOutputType(stringType, execConf);
+
+		// check open
+		Configuration config = new Configuration();
+
+		windowFunction.open(config);
+		verify(mock).open(config);
+
+		// check setRuntimeContext
+		RuntimeContext rCtx = mock(RuntimeContext.class);
+
+		windowFunction.setRuntimeContext(rCtx);
+		verify(mock).setRuntimeContext(rCtx);
+
+		// check apply
+		TimeWindow w = mock(TimeWindow.class);
+		Collector<String> c = (Collector<String>) mock(Collector.class);
+
+		windowFunction.apply(((byte)0), w, 23L, c);
+		verify(mock).apply(eq(w), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
+
+		// check close
+		windowFunction.close();
+		verify(mock).close();
+	}
+
+	public static class WindowFunctionMock
+		extends RichWindowFunction<Long, String, Long, TimeWindow>
+		implements OutputTypeConfigurable<String> {
+
+		@Override
+		public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
+
+		@Override
+		public void apply(Long aLong, TimeWindow window, Iterable<Long> input, Collector<String> out) throws Exception { }
+	}
+
+	public static class AllWindowFunctionMock
+		extends RichAllWindowFunction<Long, String, TimeWindow>
+		implements OutputTypeConfigurable<String> {
+
+		@Override
+		public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
+
+		@Override
+		public void apply(TimeWindow window, Iterable<Long> values, Collector<String> out) throws Exception { }
+	}
+}


[2/3] flink git commit: [FLINK-3949] [metrics] Add numSplitsProcessed counter metric.

Posted by fh...@apache.org.
[FLINK-3949] [metrics] Add numSplitsProcessed counter metric.

This closes #2119


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5a0c268d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5a0c268d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5a0c268d

Branch: refs/heads/master
Commit: 5a0c268dbd4abdf39c7b9d8f25ea629dfd4681b1
Parents: 18744b2
Author: zentol <ch...@apache.org>
Authored: Fri Jun 17 09:40:01 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat Jun 18 23:40:23 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/operators/DataSourceTask.java    | 4 ++--
 .../api/functions/source/ContinuousFileReaderOperator.java    | 3 +++
 .../api/functions/source/InputFormatSourceFunction.java       | 3 +++
 .../api/functions/source/InputFormatSourceFunctionTest.java   | 7 +++++++
 4 files changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5a0c268d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index c57f133..68e29b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -101,7 +101,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 		LOG.debug(getLogString("Starting data source operator"));
 
 		RuntimeContext ctx = createRuntimeContext();
-		Counter splitCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
+		Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
 		Counter numRecordsOut = ctx.getMetricGroup().counter("numRecordsOut");
 
 		if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
@@ -172,7 +172,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 					// close. We close here such that a regular close throwing an exception marks a task as failed.
 					format.close();
 				}
-				splitCounter.inc();
+				completedSplitsCounter.inc();
 			} // end for all input splits
 
 			// close the collector. if it is a chaining task collector, it will close its chained tasks

http://git-wip-us.apache.org/repos/asf/flink/blob/5a0c268d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 9319338..455c753 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -235,6 +236,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 		public void run() {
 			try {
 
+				Counter completedSplitsCounter = getMetricGroup().counter("numSplitsProcessed");
 				this.format.openInputFormat();
 
 				while (this.isRunning) {
@@ -290,6 +292,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 								}
 							}
 						}
+						completedSplitsCounter.inc();
 
 					} finally {
 						// close and prepare for the next iteration

http://git-wip-us.apache.org/repos/asf/flink/blob/5a0c268d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
index bce1ec5..f35cbba 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
@@ -70,6 +71,7 @@ public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<O
 	public void run(SourceContext<OUT> ctx) throws Exception {
 		try {
 
+			Counter completedSplitsCounter = getRuntimeContext().getMetricGroup().counter("numSplitsProcessed");
 			if (isRunning && format instanceof RichInputFormat) {
 				((RichInputFormat) format).openInputFormat();
 			}
@@ -86,6 +88,7 @@ public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<O
 					ctx.collect(nextElement);
 				}
 				format.close();
+				completedSplitsCounter.inc();
 
 				if (isRunning) {
 					isRunning = splitIterator.hasNext();

http://git-wip-us.apache.org/repos/asf/flink/blob/5a0c268d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
index 32776e1..a41c7db 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -253,6 +255,11 @@ public class InputFormatSourceFunctionTest {
 		}
 
 		@Override
+		public MetricGroup getMetricGroup() {
+			return new UnregisteredMetricsGroup();
+		}
+
+		@Override
 		public InputSplitProvider getInputSplitProvider() {
 			try {
 				this.inputSplits = format.createInputSplits(noOfSplits);