You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/04/14 00:26:48 UTC

[1/2] flink git commit: [FLINK-6298] [java api] Set context in RichOutputFormat

Repository: flink
Updated Branches:
  refs/heads/master 43d3046bb -> a30f29f68


[FLINK-6298] [java api] Set context in RichOutputFormat

This closes #3716


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1dde7327
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1dde7327
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1dde7327

Branch: refs/heads/master
Commit: 1dde7327058cb973bd4ff94bf5531d5093fb60a2
Parents: 43d3046
Author: wenlong.lwl <we...@alibaba-inc.com>
Authored: Thu Apr 13 17:23:15 2017 +0800
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu Apr 13 16:15:27 2017 -0400

----------------------------------------------------------------------
 .../sink/OutputFormatSinkFunction.java          |  9 +++++
 .../sink/OutputFormatSinkFunctionTest.java      | 42 ++++++++++++++++++++
 2 files changed, 51 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1dde7327/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java
index 16ec3f5..3f40095 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
 import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.configuration.Configuration;
@@ -61,6 +62,14 @@ public class OutputFormatSinkFunction<IN> extends RichSinkFunction<IN> implement
 	}
 
 	@Override
+	public void setRuntimeContext(RuntimeContext context) {
+		super.setRuntimeContext(context);
+		if (format instanceof RichOutputFormat) {
+			((RichOutputFormat) format).setRuntimeContext(context);
+		}
+	}
+
+	@Override
 	public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
 		if (format instanceof InputTypeConfigurable) {
 			InputTypeConfigurable itc = (InputTypeConfigurable) format;

http://git-wip-us.apache.org/repos/asf/flink/blob/1dde7327/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunctionTest.java
new file mode 100644
index 0000000..d3a9d3d
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunctionTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class OutputFormatSinkFunctionTest {
+
+	@Test
+	public void setRuntimeContext() throws Exception {
+		RuntimeContext mockRuntimeContext = Mockito.mock(RuntimeContext.class);
+
+		// Make sure setRuntimeContext of the rich output format is called
+		RichOutputFormat<?> mockRichOutputFormat = Mockito.mock(RichOutputFormat.class);
+		new OutputFormatSinkFunction<>(mockRichOutputFormat).setRuntimeContext(mockRuntimeContext);
+		Mockito.verify(mockRichOutputFormat, Mockito.times(1)).setRuntimeContext(Mockito.eq(mockRuntimeContext));
+
+		// Make sure setRuntimeContext work well when output format is not RichOutputFormat
+		OutputFormat<?> mockOutputFormat = Mockito.mock(OutputFormat.class);
+		new OutputFormatSinkFunction<>(mockOutputFormat).setRuntimeContext(mockRuntimeContext);
+	}
+}


[2/2] flink git commit: [FLINK-6304] [table] Remove unused imports

Posted by gr...@apache.org.
[FLINK-6304] [table] Remove unused imports

This closes #3717


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a30f29f6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a30f29f6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a30f29f6

Branch: refs/heads/master
Commit: a30f29f6895591997a732714630e79c097dff5fc
Parents: 1dde732
Author: sunjincheng121 <su...@gmail.com>
Authored: Thu Apr 13 18:01:40 2017 +0800
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu Apr 13 16:15:34 2017 -0400

----------------------------------------------------------------------
 .../scala/org/apache/flink/table/plan/nodes/CommonScan.scala   | 1 -
 .../scala/org/apache/flink/table/plan/nodes/FlinkRel.scala     | 6 +-----
 .../table/plan/rules/datastream/DataStreamAggregateRule.scala  | 1 -
 .../plan/rules/datastream/DataStreamOverAggregateRule.scala    | 3 ---
 .../plan/rules/datastream/StreamTableSourceScanRule.scala      | 3 +--
 .../aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala   | 3 +--
 6 files changed, 3 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a30f29f6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
index 274b602..0a0d204 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.plan.nodes
 
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.runtime.MapRunner

http://git-wip-us.apache.org/repos/asf/flink/blob/a30f29f6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
index 258d7f2..1aa084b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
@@ -22,11 +22,7 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.SqlAsOperator
 import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{TableConfig, TableException}
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.runtime.MapRunner
+import org.apache.flink.table.api.TableException
 
 import scala.collection.JavaConversions._
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a30f29f6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
index 09f05d7..052f738 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
@@ -22,7 +22,6 @@ import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTrait
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.flink.table.api.TableException
-import org.apache.flink.table.expressions.Alias
 import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
 import org.apache.flink.table.plan.nodes.datastream.{DataStreamAggregate, DataStreamConvention}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a30f29f6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
index 214d68b..dc46753 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
@@ -23,12 +23,9 @@ import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.logical.LogicalWindow
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
 import org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate
 
-import scala.collection.JavaConversions._
-
 /**
   * Rule to convert a LogicalWindow into a DataStreamOverAggregate.
   */

http://git-wip-us.apache.org/repos/asf/flink/blob/a30f29f6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
index c6c7c59..a6db084 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
@@ -23,8 +23,7 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.core.TableScan
 import org.apache.calcite.rel.logical.LogicalTableScan
-import org.apache.flink.table.plan.nodes.datastream.
-  {StreamTableSourceScan, DataStreamConvention}
+import org.apache.flink.table.plan.nodes.datastream.{StreamTableSourceScan, DataStreamConvention}
 import org.apache.flink.table.plan.schema.TableSourceTable
 import org.apache.flink.table.sources.StreamTableSource
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a30f29f6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
index 5f37b8a..d7b0cef 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
@@ -20,10 +20,9 @@ package org.apache.flink.table.runtime.aggregate
 import org.apache.flink.api.common.functions.RichFlatMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 import org.apache.flink.types.Row
-import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.util.Collector
 
 
 /**