You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/04/14 00:26:48 UTC
[1/2] flink git commit: [FLINK-6298] [java api] Set context in
RichOutputFormat
Repository: flink
Updated Branches:
refs/heads/master 43d3046bb -> a30f29f68
[FLINK-6298] [java api] Set context in RichOutputFormat
This closes #3716
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1dde7327
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1dde7327
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1dde7327
Branch: refs/heads/master
Commit: 1dde7327058cb973bd4ff94bf5531d5093fb60a2
Parents: 43d3046
Author: wenlong.lwl <we...@alibaba-inc.com>
Authored: Thu Apr 13 17:23:15 2017 +0800
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu Apr 13 16:15:27 2017 -0400
----------------------------------------------------------------------
.../sink/OutputFormatSinkFunction.java | 9 +++++
.../sink/OutputFormatSinkFunctionTest.java | 42 ++++++++++++++++++++
2 files changed, 51 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1dde7327/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java
index 16ec3f5..3f40095 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.configuration.Configuration;
@@ -61,6 +62,14 @@ public class OutputFormatSinkFunction<IN> extends RichSinkFunction<IN> implement
}
@Override
+ public void setRuntimeContext(RuntimeContext context) {
+ super.setRuntimeContext(context);
+ if (format instanceof RichOutputFormat) {
+ ((RichOutputFormat) format).setRuntimeContext(context);
+ }
+ }
+
+ @Override
public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
if (format instanceof InputTypeConfigurable) {
InputTypeConfigurable itc = (InputTypeConfigurable) format;
http://git-wip-us.apache.org/repos/asf/flink/blob/1dde7327/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunctionTest.java
new file mode 100644
index 0000000..d3a9d3d
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunctionTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class OutputFormatSinkFunctionTest {
+
+ @Test
+ public void setRuntimeContext() throws Exception {
+ RuntimeContext mockRuntimeContext = Mockito.mock(RuntimeContext.class);
+
+ // Make sure setRuntimeContext of the rich output format is called
+ RichOutputFormat<?> mockRichOutputFormat = Mockito.mock(RichOutputFormat.class);
+ new OutputFormatSinkFunction<>(mockRichOutputFormat).setRuntimeContext(mockRuntimeContext);
+ Mockito.verify(mockRichOutputFormat, Mockito.times(1)).setRuntimeContext(Mockito.eq(mockRuntimeContext));
+
+ // Make sure setRuntimeContext work well when output format is not RichOutputFormat
+ OutputFormat<?> mockOutputFormat = Mockito.mock(OutputFormat.class);
+ new OutputFormatSinkFunction<>(mockOutputFormat).setRuntimeContext(mockRuntimeContext);
+ }
+}
[2/2] flink git commit: [FLINK-6304] [table] Remove unused imports
Posted by gr...@apache.org.
[FLINK-6304] [table] Remove unused imports
This closes #3717
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a30f29f6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a30f29f6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a30f29f6
Branch: refs/heads/master
Commit: a30f29f6895591997a732714630e79c097dff5fc
Parents: 1dde732
Author: sunjincheng121 <su...@gmail.com>
Authored: Thu Apr 13 18:01:40 2017 +0800
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu Apr 13 16:15:34 2017 -0400
----------------------------------------------------------------------
.../scala/org/apache/flink/table/plan/nodes/CommonScan.scala | 1 -
.../scala/org/apache/flink/table/plan/nodes/FlinkRel.scala | 6 +-----
.../table/plan/rules/datastream/DataStreamAggregateRule.scala | 1 -
.../plan/rules/datastream/DataStreamOverAggregateRule.scala | 3 ---
.../plan/rules/datastream/StreamTableSourceScanRule.scala | 3 +--
.../aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala | 3 +--
6 files changed, 3 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a30f29f6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
index 274b602..0a0d204 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.plan.nodes
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.codegen.CodeGenerator
import org.apache.flink.table.runtime.MapRunner
http://git-wip-us.apache.org/repos/asf/flink/blob/a30f29f6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
index 258d7f2..1aa084b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
@@ -22,11 +22,7 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex._
import org.apache.calcite.sql.SqlAsOperator
import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{TableConfig, TableException}
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.runtime.MapRunner
+import org.apache.flink.table.api.TableException
import scala.collection.JavaConversions._
http://git-wip-us.apache.org/repos/asf/flink/blob/a30f29f6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
index 09f05d7..052f738 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
@@ -22,7 +22,6 @@ import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTrait
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.flink.table.api.TableException
-import org.apache.flink.table.expressions.Alias
import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
import org.apache.flink.table.plan.nodes.datastream.{DataStreamAggregate, DataStreamConvention}
http://git-wip-us.apache.org/repos/asf/flink/blob/a30f29f6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
index 214d68b..dc46753 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
@@ -23,12 +23,9 @@ import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.logical.LogicalWindow
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
import org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate
-import scala.collection.JavaConversions._
-
/**
* Rule to convert a LogicalWindow into a DataStreamOverAggregate.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/a30f29f6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
index c6c7c59..a6db084 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
@@ -23,8 +23,7 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rel.logical.LogicalTableScan
-import org.apache.flink.table.plan.nodes.datastream.
- {StreamTableSourceScan, DataStreamConvention}
+import org.apache.flink.table.plan.nodes.datastream.{StreamTableSourceScan, DataStreamConvention}
import org.apache.flink.table.plan.schema.TableSourceTable
import org.apache.flink.table.sources.StreamTableSource
http://git-wip-us.apache.org/repos/asf/flink/blob/a30f29f6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
index 5f37b8a..d7b0cef 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
@@ -20,10 +20,9 @@ package org.apache.flink.table.runtime.aggregate
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.types.Row
-import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.util.Collector
/**