You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/11/26 01:20:41 UTC
[3/8] flink git commit: [FLINK-2906] Remove Record API
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/DeltaIteration.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/DeltaIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/DeltaIteration.java
deleted file mode 100644
index a31448d..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/DeltaIteration.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.operators.OperatorInformation;
-import org.apache.flink.api.common.operators.base.DeltaIterationBase;
-import org.apache.flink.api.java.typeutils.RecordTypeInfo;
-import org.apache.flink.types.Record;
-
-import java.util.Arrays;
-
-/**
- *
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- *
- *
- * A DeltaIteration is similar to a {@link BulkIteration},
- * but maintains state across the individual iteration steps. The state is called the <i>solution set</i>, can be obtained
- * via {@link #getSolutionSet()}, and be accessed by joining (or CoGrouping) with it. The solution
- * set is updated by producing a delta for it, which is merged into the solution set at the end of each iteration step.
- * <p>
- * The delta iteration must be closed by setting a delta for the solution set ({@link #setSolutionSetDelta(org.apache.flink.api.common.operators.Operator)})
- * and the new workset (the data set that will be fed back, {@link #setNextWorkset(org.apache.flink.api.common.operators.Operator)}).
- * The DeltaIteration itself represents the result after the iteration has terminated.
- * Delta iterations terminate when the feed back data set (the workset) is empty.
- * In addition, a maximum number of steps is given as a fall back termination guard.
- * <p>
- * Elements in the solution set are uniquely identified by a key. When merging the solution set delta, contained elements
- * with the same key are replaced.
- * <p>
- * This class is a subclass of {@code DualInputOperator}. The solution set is considered the first input, the
- * workset is considered the second input.
- */
-@Deprecated
-public class DeltaIteration extends DeltaIterationBase<Record, Record> {
-
- public DeltaIteration(int keyPosition) {
- super(OperatorInfoHelper.binary(), keyPosition);
- }
-
- public DeltaIteration(int[] keyPositions) {
- super(OperatorInfoHelper.binary(), keyPositions);
- }
-
- public DeltaIteration(int keyPosition, String name) {
- super(OperatorInfoHelper.binary(), keyPosition, name);
- }
-
- public DeltaIteration(int[] keyPositions, String name) {
- super(OperatorInfoHelper.binary(), keyPositions, name);
- }
-
- /**
- * Specialized operator to use as a recognizable place-holder for the working set input to the
- * step function.
- */
- public static class WorksetPlaceHolder extends DeltaIterationBase.WorksetPlaceHolder<Record> {
- public WorksetPlaceHolder(DeltaIterationBase<?, Record> container) {
- super(container, new OperatorInformation<Record>(new RecordTypeInfo()));
- }
- }
-
- /**
- * Specialized operator to use as a recognizable place-holder for the solution set input to the
- * step function.
- */
- public static class SolutionSetPlaceHolder extends DeltaIterationBase.SolutionSetPlaceHolder<Record> {
- public SolutionSetPlaceHolder(DeltaIterationBase<Record, ?> container) {
- super(container, new OperatorInformation<Record>(new RecordTypeInfo()));
- }
-
- public void checkJoinKeyFields(int[] keyFields) {
- int[] ssKeys = containingIteration.getSolutionSetKeyFields();
- if (!Arrays.equals(ssKeys, keyFields)) {
- throw new InvalidProgramException("The solution can only be joined/co-grouped with the same keys as the elements are identified with (here: " + Arrays.toString(ssKeys) + ").");
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSink.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSink.java
deleted file mode 100644
index 9b790e6..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSink.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import java.util.List;
-
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.FileDataSinkBase;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
-import org.apache.flink.api.java.typeutils.RecordTypeInfo;
-import org.apache.flink.types.Nothing;
-import org.apache.flink.types.Record;
-
-import com.google.common.base.Preconditions;
-
-/**
- *
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- *
- *
- * Operator for nodes which act as data sinks, storing the data they receive in a file instead of sending it to another
- * contract. The encoding of the data in the file is handled by the {@link FileOutputFormat}.
- *
- * @see FileOutputFormat
- */
-@Deprecated
-public class FileDataSink extends FileDataSinkBase<Record> {
-
- private static String DEFAULT_NAME = "<Unnamed File Data Sink>";
-
- /**
- * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation
- * and the given name, writing to the file indicated by the given path.
- *
- * @param f The {@link FileOutputFormat} implementation used to encode the data.
- * @param filePath The path to the file to write the contents to.
- * @param name The given name for the sink, used in plans, logs and progress messages.
- */
- public FileDataSink(FileOutputFormat<Record> f, String filePath, String name) {
- super(f, new UnaryOperatorInformation<Record, Nothing>(new RecordTypeInfo(), new NothingTypeInfo()), filePath, name);
- }
-
- /**
- * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation
- * and a default name, writing to the file indicated by the given path.
- *
- * @param f The {@link FileOutputFormat} implementation used to encode the data.
- * @param filePath The path to the file to write the contents to.
- */
- public FileDataSink(FileOutputFormat<Record> f, String filePath) {
- this(f, filePath, DEFAULT_NAME);
- }
-
-
- /**
- * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation the default name,
- * writing to the file indicated by the given path. It uses the given contract as its input.
- *
- * @param f The {@link FileOutputFormat} implementation used to encode the data.
- * @param filePath The path to the file to write the contents to.
- * @param input The contract to use as the input.
- */
- public FileDataSink(FileOutputFormat<Record> f, String filePath, Operator<Record> input) {
- this(f, filePath);
- setInput(input);
-
- }
-
- /**
- * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation the default name,
- * writing to the file indicated by the given path. It uses the given contracts as its input.
- *
- * @param f The {@link FileOutputFormat} implementation used to encode the data.
- * @param filePath The path to the file to write the contents to.
- * @param input The contracts to use as the input.
- * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
- */
- @Deprecated
- public FileDataSink(FileOutputFormat<Record> f, String filePath, List<Operator<Record>> input) {
- this(f, filePath, input, DEFAULT_NAME);
- }
-
- /**
- * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation and the given name,
- * writing to the file indicated by the given path. It uses the given contract as its input.
- *
- * @param f The {@link FileOutputFormat} implementation used to encode the data.
- * @param filePath The path to the file to write the contents to.
- * @param input The contract to use as the input.
- * @param name The given name for the sink, used in plans, logs and progress messages.
- */
- public FileDataSink(FileOutputFormat<Record> f, String filePath, Operator<Record> input, String name) {
- this(f, filePath, name);
- setInput(input);
- }
-
- /**
- * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation and the given name,
- * writing to the file indicated by the given path. It uses the given contracts as its input.
- *
- * @param f The {@link FileOutputFormat} implementation used to encode the data.
- * @param filePath The path to the file to write the contents to.
- * @param input The contracts to use as the input.
- * @param name The given name for the sink, used in plans, logs and progress messages.
- * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
- */
- @Deprecated
- public FileDataSink(FileOutputFormat<Record> f, String filePath, List<Operator<Record>> input, String name) {
- this(f, filePath, name);
- Preconditions.checkNotNull(input, "The input must not be null.");
- setInput(Operator.createUnionCascade(input));
- }
-
- /**
- * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation
- * and the given name, writing to the file indicated by the given path.
- *
- * @param f The {@link FileOutputFormat} implementation used to encode the data.
- * @param filePath The path to the file to write the contents to.
- * @param name The given name for the sink, used in plans, logs and progress messages.
- */
- public FileDataSink(Class<? extends FileOutputFormat<Record>> f, String filePath, String name) {
- super(new UserCodeClassWrapper<FileOutputFormat<Record>>(f),
- new UnaryOperatorInformation<Record, Nothing>(new RecordTypeInfo(), new NothingTypeInfo()),
- filePath, name);
- }
-
- /**
- * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation
- * and a default name, writing to the file indicated by the given path.
- *
- * @param f The {@link FileOutputFormat} implementation used to encode the data.
- * @param filePath The path to the file to write the contents to.
- */
- public FileDataSink(Class<? extends FileOutputFormat<Record>> f, String filePath) {
- this(f, filePath, DEFAULT_NAME);
- }
-
- /**
- * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation the default name,
- * writing to the file indicated by the given path. It uses the given contract as its input.
- *
- * @param f The {@link FileOutputFormat} implementation used to encode the data.
- * @param filePath The path to the file to write the contents to.
- * @param input The contract to use as the input.
- */
- public FileDataSink(Class<? extends FileOutputFormat<Record>> f, String filePath, Operator<Record> input) {
- this(f, filePath, input, DEFAULT_NAME);
- }
-
- /**
- * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation the default name,
- * writing to the file indicated by the given path. It uses the given contracts as its input.
- *
- * @param f The {@link FileOutputFormat} implementation used to encode the data.
- * @param filePath The path to the file to write the contents to.
- * @param input The contracts to use as the input.
- * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
- */
- @Deprecated
- public FileDataSink(Class<? extends FileOutputFormat<Record>> f, String filePath, List<Operator<Record>> input) {
- this(f, filePath, input, DEFAULT_NAME);
- }
-
- /**
- * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation and the given name,
- * writing to the file indicated by the given path. It uses the given contract as its input.
- *
- * @param f The {@link FileOutputFormat} implementation used to encode the data.
- * @param filePath The path to the file to write the contents to.
- * @param input The contract to use as the input.
- * @param name The given name for the sink, used in plans, logs and progress messages.
- */
- public FileDataSink(Class<? extends FileOutputFormat<Record>> f, String filePath, Operator<Record> input, String name) {
- this(f, filePath, name);
- setInput(input);
- }
-
- /**
- * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation and the given name,
- * writing to the file indicated by the given path. It uses the given contracts as its input.
- *
- * @param f The {@link FileOutputFormat} implementation used to encode the data.
- * @param filePath The path to the file to write the contents to.
- * @param input The contracts to use as the input.
- * @param name The given name for the sink, used in plans, logs and progress messages.
- * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
- */
- @Deprecated
- public FileDataSink(Class<? extends FileOutputFormat<Record>> f, String filePath, List<Operator<Record>> input, String name) {
- this(f, filePath, name);
- Preconditions.checkNotNull(input, "The inputs must not be null.");
- setInput(Operator.createUnionCascade(input));
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Returns the configured file path where the output is written to.
- *
- * @return The path to which the output shall be written.
- */
- public String getFilePath()
- {
- return this.filePath;
- }
-
-
- @Override
- public String toString() {
- return this.filePath;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSource.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSource.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSource.java
deleted file mode 100644
index d5132b8..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSource.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.common.operators.base.FileDataSourceBase;
-import org.apache.flink.types.Record;
-
-/**
- *
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- *
- * Operator for input nodes which read data from files. (For Record data model)
- */
-@Deprecated
-public class FileDataSource extends FileDataSourceBase<Record> {
-
- /**
- * Creates a new instance for the given file using the given file input format.
- *
- * @param f The {@link FileInputFormat} implementation used to read the data.
- * @param filePath The file location. The file path must be a fully qualified URI, including the address schema.
- * @param name The given name for the Pact, used in plans, logs and progress messages.
- */
- public FileDataSource(FileInputFormat<Record> f, String filePath, String name) {
- super(f, OperatorInfoHelper.source(), filePath, name);
- }
-
- /**
- * Creates a new instance for the given file using the given input format. The contract has the default name.
- *
- * @param f The {@link FileInputFormat} implementation used to read the data.
- * @param filePath The file location. The file path must be a fully qualified URI, including the address schema.
- */
- public FileDataSource(FileInputFormat<Record> f, String filePath) {
- super(f, OperatorInfoHelper.source(), filePath);
- }
-
- /**
- * Creates a new instance for the given file using the given file input format.
- *
- * @param f The {@link FileInputFormat} implementation used to read the data.
- * @param filePath The file location. The file path must be a fully qualified URI, including the address schema.
- * @param name The given name for the Pact, used in plans, logs and progress messages.
- */
- public FileDataSource(Class<? extends FileInputFormat<Record>> f, String filePath, String name) {
- super(f, OperatorInfoHelper.source(), filePath, name);
- }
-
- /**
- * Creates a new instance for the given file using the given input format. The contract has the default name.
- *
- * @param f The {@link FileInputFormat} implementation used to read the data.
- * @param filePath The file location. The file path must be a fully qualified URI, including the address schema.
- */
- public FileDataSource(Class<? extends FileInputFormat<Record>> f, String filePath) {
- super(f, OperatorInfoHelper.source(), filePath);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSink.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSink.java
deleted file mode 100644
index 22ae477..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSink.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import com.google.common.base.Preconditions;
-
-import java.util.List;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.common.operators.GenericDataSinkBase;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
-import org.apache.flink.api.java.typeutils.RecordTypeInfo;
-import org.apache.flink.types.Nothing;
-import org.apache.flink.types.Record;
-
-/**
- *
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- *
- * Operator for nodes that act as data sinks, storing the data they receive.
- * The way the data is stored is handled by the {@link OutputFormat}.
- */
-
-@Deprecated
-public class GenericDataSink extends GenericDataSinkBase<Record> {
-
- private static String DEFAULT_NAME = "<Unnamed Generic Record Data Sink>";
-
- /**
- * Creates a GenericDataSink with the provided {@link OutputFormat} implementation
- * and the given name.
- *
- * @param f The {@link OutputFormat} implementation used to sink the data.
- * @param name The given name for the sink, used in plans, logs and progress messages.
- */
- public GenericDataSink(OutputFormat<Record> f, String name) {
- super(f, new UnaryOperatorInformation<Record, Nothing>(new RecordTypeInfo(), new NothingTypeInfo()), name);
- }
-
-
- /**
- * Creates a GenericDataSink with the provided {@link OutputFormat} implementation
- * and a default name.
- *
- * @param f The {@link OutputFormat} implementation used to sink the data.
- */
- public GenericDataSink(OutputFormat<Record> f) {
- this(f, DEFAULT_NAME);
- }
-
- /**
- * Creates a GenericDataSink with the provided {@link OutputFormat} implementation the default name.
- * It uses the given operator as its input.
- *
- * @param f The {@link OutputFormat} implementation used to sink the data.
- * @param input The operator to use as the input.
- */
- public GenericDataSink(OutputFormat<Record> f, Operator<Record> input) {
- this(f, input, DEFAULT_NAME);
- }
-
- /**
- * Creates a GenericDataSink with the provided {@link OutputFormat} implementation the default name.
- * It uses the given contracts as its input.
- *
- * @param f The {@link OutputFormat} implementation used to sink the data.
- * @param input The contracts to use as the input.
- * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
- */
- @Deprecated
- public GenericDataSink(OutputFormat<Record> f, List<Operator<Record>> input) {
- this(f, input, DEFAULT_NAME);
- }
-
- /**
- * Creates a GenericDataSink with the provided {@link OutputFormat} implementation and the given name.
- * It uses the given operator as its input.
- *
- * @param f The {@link OutputFormat} implementation used to sink the data.
- * @param input The operator to use as the input.
- * @param name The given name for the sink, used in plans, logs and progress messages.
- */
- public GenericDataSink(OutputFormat<Record> f, Operator<Record> input, String name) {
- this(f, name);
- setInput(input);
- }
-
- /**
- * Creates a GenericDataSink with the provided {@link OutputFormat} implementation and the given name.
- * It uses the given contracts as its input.
- *
- * @param f The {@link OutputFormat} implementation used to sink the data.
- * @param input The contracts to use as the input.
- * @param name The given name for the sink, used in plans, logs and progress messages.
- * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
- */
- @Deprecated
- public GenericDataSink(OutputFormat<Record> f, List<Operator<Record>> input, String name) {
- this(f, name);
- setInputs(input);
- }
-
- /**
- * Creates a GenericDataSink with the provided {@link OutputFormat} implementation
- * and the given name.
- *
- * @param f The {@link OutputFormat} implementation used to sink the data.
- * @param name The given name for the sink, used in plans, logs and progress messages.
- */
- public GenericDataSink(Class<? extends OutputFormat<Record>> f, String name) {
- super(new UserCodeClassWrapper<OutputFormat<Record>>(f),
- new UnaryOperatorInformation<Record, Nothing>(new RecordTypeInfo(), new NothingTypeInfo()), name);
- }
-
-
- /**
- * Creates a GenericDataSink with the provided {@link OutputFormat} implementation
- * and a default name.
- *
- * @param f The {@link OutputFormat} implementation used to sink the data.
- */
- public GenericDataSink(Class<? extends OutputFormat<Record>> f) {
- this(f, DEFAULT_NAME);
- }
-
- /**
- * Creates a GenericDataSink with the provided {@link OutputFormat} implementation the default name.
- * It uses the given operator as its input.
- *
- * @param f The {@link OutputFormat} implementation used to sink the data.
- * @param input The operator to use as the input.
- */
- public GenericDataSink(Class<? extends OutputFormat<Record>> f, Operator<Record> input) {
- this(f, input, DEFAULT_NAME);
- }
-
- /**
- * Creates a GenericDataSink with the provided {@link OutputFormat} implementation the default name.
- * It uses the given contracts as its input.
- *
- * @param f The {@link OutputFormat} implementation used to sink the data.
- * @param input The contracts to use as the input.
- * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
- */
- @Deprecated
- public GenericDataSink(Class<? extends OutputFormat<Record>> f, List<Operator<Record>> input) {
- this(f, input, DEFAULT_NAME);
- }
-
- /**
- * Creates a GenericDataSink with the provided {@link OutputFormat} implementation and the given name.
- * It uses the given operator as its input.
- *
- * @param f The {@link OutputFormat} implementation used to sink the data.
- * @param input The operator to use as the input.
- * @param name The given name for the sink, used in plans, logs and progress messages.
- */
- public GenericDataSink(Class<? extends OutputFormat<Record>> f, Operator<Record> input, String name) {
- this(f, name);
- setInput(input);
- }
-
- /**
- * Creates a GenericDataSink with the provided {@link OutputFormat} implementation and the given name.
- * It uses the given contracts as its input.
- *
- * @param f The {@link OutputFormat} implementation used to sink the data.
- * @param input The contracts to use as the input.
- * @param name The given name for the sink, used in plans, logs and progress messages.
- * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
- */
- @Deprecated
- public GenericDataSink(Class<? extends OutputFormat<Record>> f, List<Operator<Record>> input, String name) {
- this(f, name);
- setInputs(input);
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Sets the input to the union of the given operators.
- *
- * @param inputs The operator(s) that form the input.
- * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
- */
- @Deprecated
- public void setInputs(Operator<Record>... inputs) {
- Preconditions.checkNotNull(inputs, "The inputs may not be null.");
- this.input = Operator.createUnionCascade(inputs);
- }
-
- /**
- * Sets the input to the union of the given operators.
- *
- * @param inputs The operator(s) that form the input.
- * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
- */
- @Deprecated
- public void setInputs(List<Operator<Record>> inputs) {
- Preconditions.checkNotNull(inputs, "The inputs may not be null.");
- this.input = Operator.createUnionCascade(inputs);
- }
-
- /**
- * Adds to the input the union of the given operators.
- *
- * @param inputs The operator(s) to be unioned with the input.
- * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
- */
- @Deprecated
- public void addInput(Operator<Record>... inputs) {
- Preconditions.checkNotNull(inputs, "The input may not be null.");
- this.input = Operator.createUnionCascade(this.input, inputs);
- }
-
- /**
- * Adds to the input the union of the given operators.
- *
- * @param inputs The operator(s) to be unioned with the input.
- * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
- */
- @SuppressWarnings("unchecked")
- @Deprecated
- public void addInputs(List<? extends Operator<Record>> inputs) {
- Preconditions.checkNotNull(inputs, "The inputs may not be null.");
- this.input = createUnionCascade(this.input, (Operator<Record>[]) inputs.toArray(new Operator[inputs.size()]));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSource.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSource.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSource.java
deleted file mode 100644
index 39e7e3b..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSource.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.types.Record;
-
-/**
- *
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- *
- * Abstract superclass for data sources in a Pact plan.
- *
- * @param <T> The type of input format invoked by instances of this data source.
- */
-
-@Deprecated
-public class GenericDataSource<T extends InputFormat<Record, ?>> extends GenericDataSourceBase<Record, T> {
-
- /**
- * Creates a new instance for the given file using the given input format.
- *
- * @param format The {@link InputFormat} implementation used to read the data.
- * @param name The given name for the Pact, used in plans, logs and progress messages.
- */
- public GenericDataSource(T format,String name) {
- super(format, OperatorInfoHelper.source(), name);
- }
-
- /**
- * Creates a new instance for the given file using the given input format, using the default name.
- *
- * @param format The {@link InputFormat} implementation used to read the data.
- */
- public GenericDataSource(T format) {
- super(format, OperatorInfoHelper.source());
- }
-
- /**
- * Creates a new instance for the given file using the given input format.
- *
- * @param format The {@link InputFormat} implementation used to read the data.
- * @param name The given name for the Pact, used in plans, logs and progress messages.
- */
- public GenericDataSource(Class<? extends T> format, String name) {
- super(format, OperatorInfoHelper.source(), name);
- }
-
- /**
- * Creates a new instance for the given file using the given input format, using the default name.
- *
- * @param format The {@link InputFormat} implementation used to read the data.
- */
- public GenericDataSource(Class<? extends T> format) {
- super(format, OperatorInfoHelper.source());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java
deleted file mode 100644
index 75744fe..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.RecordOperator;
-import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-
-import com.google.common.base.Preconditions;
-
-
-/**
- *
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- *
- *
- * JoinOperator that applies a {@link JoinFunction} to each pair of records from both inputs
- * that have matching keys.
- */
-
-@Deprecated
-public class JoinOperator extends InnerJoinOperatorBase<Record, Record, Record, JoinFunction> implements RecordOperator {
-
- /**
- * The types of the keys that the operator operates on.
- */
- private final Class<? extends Key<?>>[] keyTypes;
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Creates a Builder with the provided {@link JoinFunction} implementation
- *
- * @param udf The {@link JoinFunction} implementation for this join.
- * @param keyClass The class of the key data type.
- * @param keyColumn1 The position of the key in the first input's records.
- * @param keyColumn2 The position of the key in the second input's records.
- */
- public static Builder builder(JoinFunction udf, Class<? extends Key<?>> keyClass, int keyColumn1, int keyColumn2) {
- return new Builder(new UserCodeObjectWrapper<JoinFunction>(udf), keyClass, keyColumn1, keyColumn2);
- }
-
- /**
- * Creates a Builder with the provided {@link JoinFunction} implementation
- *
- * @param udf The {@link JoinFunction} implementation for this Match operator.
- * @param keyClass The class of the key data type.
- * @param keyColumn1 The position of the key in the first input's records.
- * @param keyColumn2 The position of the key in the second input's records.
- */
- public static Builder builder(Class<? extends JoinFunction> udf, Class<? extends Key<?>> keyClass, int keyColumn1, int keyColumn2) {
- return new Builder(new UserCodeClassWrapper<JoinFunction>(udf), keyClass, keyColumn1, keyColumn2);
- }
-
- /**
- * The private constructor that only gets invoked from the Builder.
- * @param builder
- */
- protected JoinOperator(Builder builder) {
- super(builder.udf, OperatorInfoHelper.binary(), builder.getKeyColumnsArray1(), builder.getKeyColumnsArray2(), builder.name);
- this.keyTypes = builder.getKeyClassesArray();
-
- if (builder.inputs1 != null && !builder.inputs1.isEmpty()) {
- setFirstInput(Operator.createUnionCascade(builder.inputs1));
- }
- if (builder.inputs2 != null && !builder.inputs2.isEmpty()) {
- setSecondInput(Operator.createUnionCascade(builder.inputs2));
- }
-
- // sanity check solution set key mismatches
- if (input1 instanceof DeltaIteration.SolutionSetPlaceHolder) {
- int[] positions = getKeyColumns(0);
- ((DeltaIteration.SolutionSetPlaceHolder) input1).checkJoinKeyFields(positions);
- }
- if (input2 instanceof DeltaIteration.SolutionSetPlaceHolder) {
- int[] positions = getKeyColumns(1);
- ((DeltaIteration.SolutionSetPlaceHolder) input2).checkJoinKeyFields(positions);
- }
-
- setBroadcastVariables(builder.broadcastInputs);
- setSemanticProperties(FunctionAnnotation.readDualConstantAnnotations(builder.udf));
- }
-
- @Override
- public Class<? extends Key<?>>[] getKeyClasses() {
- return this.keyTypes;
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition).
- */
- public static class Builder {
-
- /* The required parameters */
- private final UserCodeWrapper<JoinFunction> udf;
- private final List<Class<? extends Key<?>>> keyClasses;
- private final List<Integer> keyColumns1;
- private final List<Integer> keyColumns2;
-
- /* The optional parameters */
- private List<Operator<Record>> inputs1;
- private List<Operator<Record>> inputs2;
- private Map<String, Operator<Record>> broadcastInputs;
- private String name;
-
-
- /**
- * Creates a Builder with the provided {@link JoinFunction} implementation
- *
- * @param udf The {@link JoinFunction} implementation for this Match operator.
- * @param keyClass The class of the key data type.
- * @param keyColumn1 The position of the key in the first input's records.
- * @param keyColumn2 The position of the key in the second input's records.
- */
- protected Builder(UserCodeWrapper<JoinFunction> udf, Class<? extends Key<?>> keyClass, int keyColumn1, int keyColumn2) {
- this.udf = udf;
- this.keyClasses = new ArrayList<Class<? extends Key<?>>>();
- this.keyClasses.add(keyClass);
- this.keyColumns1 = new ArrayList<Integer>();
- this.keyColumns1.add(keyColumn1);
- this.keyColumns2 = new ArrayList<Integer>();
- this.keyColumns2.add(keyColumn2);
- this.inputs1 = new ArrayList<Operator<Record>>();
- this.inputs2 = new ArrayList<Operator<Record>>();
- this.broadcastInputs = new HashMap<String, Operator<Record>>();
- }
-
- /**
- * Creates a Builder with the provided {@link JoinFunction} implementation. This method is intended
- * for special case sub-types only.
- *
- * @param udf The {@link JoinFunction} implementation for this Match operator.
- */
- protected Builder(UserCodeWrapper<JoinFunction> udf) {
- this.udf = udf;
- this.keyClasses = new ArrayList<Class<? extends Key<?>>>();
- this.keyColumns1 = new ArrayList<Integer>();
- this.keyColumns2 = new ArrayList<Integer>();
- this.inputs1 = new ArrayList<Operator<Record>>();
- this.inputs2 = new ArrayList<Operator<Record>>();
- this.broadcastInputs = new HashMap<String, Operator<Record>>();
- }
-
- private int[] getKeyColumnsArray1() {
- int[] result = new int[keyColumns1.size()];
- for (int i = 0; i < keyColumns1.size(); ++i) {
- result[i] = keyColumns1.get(i);
- }
- return result;
- }
-
- private int[] getKeyColumnsArray2() {
- int[] result = new int[keyColumns2.size()];
- for (int i = 0; i < keyColumns2.size(); ++i) {
- result[i] = keyColumns2.get(i);
- }
- return result;
- }
-
- @SuppressWarnings("unchecked")
- private Class<? extends Key<?>>[] getKeyClassesArray() {
- return keyClasses.toArray(new Class[keyClasses.size()]);
- }
-
- /**
- * Adds additional key field.
- *
- * @param keyClass The class of the key data type.
- * @param keyColumn1 The position of the key in the first input's records.
- * @param keyColumn2 The position of the key in the second input's records.
- */
- public Builder keyField(Class<? extends Key<?>> keyClass, int keyColumn1, int keyColumn2) {
- keyClasses.add(keyClass);
- keyColumns1.add(keyColumn1);
- keyColumns2.add(keyColumn2);
- return this;
- }
-
- /**
- * Sets the first input.
- *
- * @param input The first input.
- */
- public Builder input1(Operator<Record> input) {
- Preconditions.checkNotNull(input, "The input must not be null");
-
- this.inputs1.clear();
- this.inputs1.add(input);
- return this;
- }
-
- /**
- * Sets the second input.
- *
- * @param input The second input.
- */
- public Builder input2(Operator<Record> input) {
- Preconditions.checkNotNull(input, "The input must not be null");
-
- this.inputs2.clear();
- this.inputs2.add(input);
- return this;
- }
-
- /**
- * Sets one or several inputs (union) for input 1.
- *
- * @param inputs
- */
- public Builder input1(Operator<Record>...inputs) {
- this.inputs1.clear();
- for (Operator<Record> c : inputs) {
- this.inputs1.add(c);
- }
- return this;
- }
-
- /**
- * Sets one or several inputs (union) for input 2.
- *
- * @param inputs
- */
- public Builder input2(Operator<Record>...inputs) {
- this.inputs2.clear();
- for (Operator<Record> c : inputs) {
- this.inputs2.add(c);
- }
- return this;
- }
-
- /**
- * Sets the first inputs.
- *
- * @param inputs
- */
- public Builder inputs1(List<Operator<Record>> inputs) {
- this.inputs1 = inputs;
- return this;
- }
-
- /**
- * Sets the second inputs.
- *
- * @param inputs
- */
- public Builder inputs2(List<Operator<Record>> inputs) {
- this.inputs2 = inputs;
- return this;
- }
-
- /**
- * Binds the result produced by a plan rooted at {@code root} to a
- * variable used by the UDF wrapped in this operator.
- */
- public Builder setBroadcastVariable(String name, Operator<Record> input) {
- this.broadcastInputs.put(name, input);
- return this;
- }
-
- /**
- * Binds multiple broadcast variables.
- */
- public Builder setBroadcastVariables(Map<String, Operator<Record>> inputs) {
- this.broadcastInputs.clear();
- this.broadcastInputs.putAll(inputs);
- return this;
- }
-
- /**
- * Sets the name of this operator.
- */
- public Builder name(String name) {
- this.name = name;
- return this;
- }
-
- /**
- * Creates and returns a JoinOperator from using the values given
- * to the builder.
- *
- * @return The created operator
- */
- public JoinOperator build() {
- if (keyClasses.size() <= 0) {
- throw new IllegalStateException("At least one key attribute has to be set.");
- }
- if (name == null) {
- name = udf.getUserCodeClass().getName();
- }
- return new JoinOperator(this);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java
deleted file mode 100644
index ed4175b..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.RecordOperator;
-import org.apache.flink.api.common.operators.base.CollectorMapOperatorBase;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-
-import com.google.common.base.Preconditions;
-
-/**
- *
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- *
- *
- * MapOperator that applies a {@link MapFunction} to each record independently.
- *
- * @see MapFunction
- */
-
-@Deprecated
-public class MapOperator extends CollectorMapOperatorBase<Record, Record, MapFunction> implements RecordOperator {
-
- private static String DEFAULT_NAME = "<Unnamed Mapper>";
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Creates a Builder with the provided {@link MapFunction} implementation.
- *
- * @param udf The {@link MapFunction} implementation for this Map operator.
- */
- public static Builder builder(MapFunction udf) {
- return new Builder(new UserCodeObjectWrapper<MapFunction>(udf));
- }
-
- /**
- * Creates a Builder with the provided {@link MapFunction} implementation.
- *
- * @param udf The {@link MapFunction} implementation for this Map operator.
- */
- public static Builder builder(Class<? extends MapFunction> udf) {
- return new Builder(new UserCodeClassWrapper<MapFunction>(udf));
- }
-
- /**
- * The private constructor that only gets invoked from the Builder.
- * @param builder
- */
- protected MapOperator(Builder builder) {
- super(builder.udf, OperatorInfoHelper.unary(), builder.name);
-
- if (builder.inputs != null && !builder.inputs.isEmpty()) {
- setInput(Operator.createUnionCascade(builder.inputs));
- }
-
- setBroadcastVariables(builder.broadcastInputs);
- setSemanticProperties(FunctionAnnotation.readSingleConstantAnnotations(builder.udf));
- }
-
-
- @Override
- public Class<? extends Key<?>>[] getKeyClasses() {
- return emptyClassArray();
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition).
- */
- public static class Builder {
-
- /* The required parameters */
- private final UserCodeWrapper<MapFunction> udf;
-
- /* The optional parameters */
- private List<Operator<Record>> inputs;
- private Map<String, Operator<Record>> broadcastInputs;
- private String name = DEFAULT_NAME;
-
- /**
- * Creates a Builder with the provided {@link MapFunction} implementation.
- *
- * @param udf The {@link MapFunction} implementation for this Map operator.
- */
- private Builder(UserCodeWrapper<MapFunction> udf) {
- this.udf = udf;
- this.inputs = new ArrayList<Operator<Record>>();
- this.broadcastInputs = new HashMap<String, Operator<Record>>();
- }
-
- /**
- * Sets the input.
- *
- * @param input The input.
- */
- public Builder input(Operator<Record> input) {
- Preconditions.checkNotNull(input, "The input must not be null");
-
- this.inputs.clear();
- this.inputs.add(input);
- return this;
- }
-
- /**
- * Sets one or several inputs (union).
- *
- * @param inputs
- */
- public Builder input(Operator<Record>...inputs) {
- this.inputs.clear();
- for (Operator<Record> c : inputs) {
- this.inputs.add(c);
- }
- return this;
- }
-
- /**
- * Sets the inputs.
- *
- * @param inputs
- */
- public Builder inputs(List<Operator<Record>> inputs) {
- this.inputs = inputs;
- return this;
- }
-
- /**
- * Binds the result produced by a plan rooted at {@code root} to a
- * variable used by the UDF wrapped in this operator.
- */
- public Builder setBroadcastVariable(String name, Operator<Record> input) {
- this.broadcastInputs.put(name, input);
- return this;
- }
-
- /**
- * Binds multiple broadcast variables.
- */
- public Builder setBroadcastVariables(Map<String, Operator<Record>> inputs) {
- this.broadcastInputs.clear();
- this.broadcastInputs.putAll(inputs);
- return this;
- }
-
- /**
- * Sets the name of this operator.
- *
- * @param name
- */
- public Builder name(String name) {
- this.name = name;
- return this;
- }
-
- /**
- * Creates and returns a MapOperator from using the values given
- * to the builder.
- *
- * @return The created operator
- */
- public MapOperator build() {
- if (name == null) {
- name = udf.getUserCodeClass().getName();
- }
- return new MapOperator(this);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapPartitionOperator.java
deleted file mode 100644
index 235e5ad..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapPartitionOperator.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.RecordOperator;
-import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation;
-import org.apache.flink.api.java.record.functions.MapPartitionFunction;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-
-import com.google.common.base.Preconditions;
-
-/**
- *
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- * MapPartitionOperator that applies a {@link MapPartitionFunction} to each record independently.
- *
- * @see MapPartitionFunction
- */
-
-@Deprecated
-public class MapPartitionOperator extends MapPartitionOperatorBase<Record, Record, MapPartitionFunction> implements RecordOperator {
-
- private static String DEFAULT_NAME = "<Unnamed MapPartition>";
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
- *
- * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
- */
- public static Builder builder(MapPartitionFunction udf) {
- return new Builder(new UserCodeObjectWrapper<MapPartitionFunction>(udf));
- }
-
- /**
- * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
- *
- * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
- */
- public static Builder builder(Class<? extends MapPartitionFunction> udf) {
- return new Builder(new UserCodeClassWrapper<MapPartitionFunction>(udf));
- }
-
- /**
- * The private constructor that only gets invoked from the Builder.
- * @param builder
- */
- protected MapPartitionOperator(Builder builder) {
-
- super(builder.udf, OperatorInfoHelper.unary(), builder.name);
-
- if (builder.inputs != null && !builder.inputs.isEmpty()) {
- setInput(Operator.createUnionCascade(builder.inputs));
- }
-
- setBroadcastVariables(builder.broadcastInputs);
- setSemanticProperties(FunctionAnnotation.readSingleConstantAnnotations(builder.udf));
- }
-
-
- @Override
- public Class<? extends Key<?>>[] getKeyClasses() {
- return emptyClassArray();
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition).
- */
- public static class Builder {
-
- /* The required parameters */
- private final UserCodeWrapper<MapPartitionFunction> udf;
-
- /* The optional parameters */
- private List<Operator<Record>> inputs;
- private Map<String, Operator<Record>> broadcastInputs;
- private String name = DEFAULT_NAME;
-
- /**
- * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
- *
- * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
- */
- private Builder(UserCodeWrapper<MapPartitionFunction> udf) {
- this.udf = udf;
- this.inputs = new ArrayList<Operator<Record>>();
- this.broadcastInputs = new HashMap<String, Operator<Record>>();
- }
-
- /**
- * Sets the input.
- *
- * @param input The input.
- */
- public Builder input(Operator<Record> input) {
- Preconditions.checkNotNull(input, "The input must not be null");
-
- this.inputs.clear();
- this.inputs.add(input);
- return this;
- }
-
- /**
- * Sets one or several inputs (union).
- *
- * @param inputs
- */
- public Builder input(Operator<Record>...inputs) {
- this.inputs.clear();
- for (Operator<Record> c : inputs) {
- this.inputs.add(c);
- }
- return this;
- }
-
- /**
- * Sets the inputs.
- *
- * @param inputs
- */
- public Builder inputs(List<Operator<Record>> inputs) {
- this.inputs = inputs;
- return this;
- }
-
- /**
- * Binds the result produced by a plan rooted at {@code root} to a
- * variable used by the UDF wrapped in this operator.
- */
- public Builder setBroadcastVariable(String name, Operator<Record> input) {
- this.broadcastInputs.put(name, input);
- return this;
- }
-
- /**
- * Binds multiple broadcast variables.
- */
- public Builder setBroadcastVariables(Map<String, Operator<Record>> inputs) {
- this.broadcastInputs.clear();
- this.broadcastInputs.putAll(inputs);
- return this;
- }
-
- /**
- * Sets the name of this operator.
- *
- * @param name
- */
- public Builder name(String name) {
- this.name = name;
- return this;
- }
-
- /**
- * Creates and returns a MapOperator from using the values given
- * to the builder.
- *
- * @return The created operator
- */
- public MapPartitionOperator build() {
- if (name == null) {
- name = udf.getUserCodeClass().getName();
- }
- return new MapPartitionOperator(this);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/OperatorInfoHelper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/OperatorInfoHelper.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/OperatorInfoHelper.java
deleted file mode 100644
index 7b2bb5e..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/OperatorInfoHelper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-
-import org.apache.flink.api.common.operators.BinaryOperatorInformation;
-import org.apache.flink.api.common.operators.OperatorInformation;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.java.typeutils.RecordTypeInfo;
-import org.apache.flink.types.Record;
-
-/**
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- *
- */
-@Deprecated
-public class OperatorInfoHelper {
-
- public static OperatorInformation<Record> source() {
- return new OperatorInformation<Record>(new RecordTypeInfo());
- }
-
- public static UnaryOperatorInformation<Record, Record> unary() {
- return new UnaryOperatorInformation<Record, Record>(new RecordTypeInfo(), new RecordTypeInfo());
- }
-
- public static BinaryOperatorInformation<Record, Record, Record> binary() {
- return new BinaryOperatorInformation<Record, Record, Record>(new RecordTypeInfo(), new RecordTypeInfo(), new RecordTypeInfo());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
deleted file mode 100644
index 05905cb..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.operators;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.RecordOperator;
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.java.operators.translation.WrappingFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-
-import com.google.common.base.Preconditions;
-
-/**
- * ReduceOperator evaluating a {@link ReduceFunction} over each group of records that share the same key.
- *
- * @see ReduceFunction
- */
-@SuppressWarnings("deprecation")
-public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, GroupReduceFunction<Record, Record>> implements RecordOperator {
-
- private static final String DEFAULT_NAME = "<Unnamed Reducer>"; // the default name for contracts
-
- /**
- * The types of the keys that the contract operates on.
- */
- private final Class<? extends Key<?>>[] keyTypes;
-
- private final UserCodeWrapper<ReduceFunction> originalFunction;
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Creates a Builder with the provided {@link ReduceFunction} implementation.
- *
- * @param udf The {@link ReduceFunction} implementation for this Reduce contract.
- */
- public static Builder builder(ReduceFunction udf) {
- UserCodeWrapper<ReduceFunction> original = new UserCodeObjectWrapper<ReduceFunction>(udf);
- UserCodeWrapper<GroupReduceFunction<Record, Record>> wrapped =
- new UserCodeObjectWrapper<GroupReduceFunction<Record, Record>>(new WrappingReduceFunction(udf));
-
- return new Builder(original, wrapped);
- }
-
- /**
- * Creates a Builder with the provided {@link ReduceFunction} implementation.
- *
- * @param udf The {@link ReduceFunction} implementation for this Reduce contract.
- * @param keyClass The class of the key data type.
- * @param keyColumn The position of the key.
- */
- public static Builder builder(ReduceFunction udf, Class<? extends Key<?>> keyClass, int keyColumn) {
- UserCodeWrapper<ReduceFunction> original = new UserCodeObjectWrapper<ReduceFunction>(udf);
- UserCodeWrapper<GroupReduceFunction<Record, Record>> wrapped =
- new UserCodeObjectWrapper<GroupReduceFunction<Record, Record>>(new WrappingReduceFunction(udf));
-
- return new Builder(original, wrapped, keyClass, keyColumn);
- }
-
- /**
- * Creates a Builder with the provided {@link ReduceFunction} implementation.
- *
- * @param udf The {@link ReduceFunction} implementation for this Reduce contract.
- */
- public static Builder builder(Class<? extends ReduceFunction> udf) {
- UserCodeWrapper<ReduceFunction> original = new UserCodeClassWrapper<ReduceFunction>(udf);
- UserCodeWrapper<GroupReduceFunction<Record, Record>> wrapped =
- new UserCodeObjectWrapper<GroupReduceFunction<Record, Record>>(new WrappingClassReduceFunction(udf));
-
- return new Builder(original, wrapped);
- }
-
- /**
- * Creates a Builder with the provided {@link ReduceFunction} implementation.
- *
- * @param udf The {@link ReduceFunction} implementation for this Reduce contract.
- * @param keyClass The class of the key data type.
- * @param keyColumn The position of the key.
- */
- public static Builder builder(Class<? extends ReduceFunction> udf, Class<? extends Key<?>> keyClass, int keyColumn) {
- UserCodeWrapper<ReduceFunction> original = new UserCodeClassWrapper<ReduceFunction>(udf);
- UserCodeWrapper<GroupReduceFunction<Record, Record>> wrapped =
- new UserCodeObjectWrapper<GroupReduceFunction<Record, Record>>(new WrappingClassReduceFunction(udf));
-
- return new Builder(original, wrapped, keyClass, keyColumn);
- }
-
- /**
- * The private constructor that only gets invoked from the Builder.
- * @param builder
- */
- protected ReduceOperator(Builder builder) {
- super(builder.udfWrapper, OperatorInfoHelper.unary(), builder.getKeyColumnsArray(), builder.name);
-
- this.keyTypes = builder.getKeyClassesArray();
- this.originalFunction = builder.originalUdf;
-
- if (builder.inputs != null && !builder.inputs.isEmpty()) {
- setInput(Operator.createUnionCascade(builder.inputs));
- }
-
- setGroupOrder(builder.secondaryOrder);
- setBroadcastVariables(builder.broadcastInputs);
-
- setSemanticProperties(FunctionAnnotation.readSingleConstantAnnotations(originalFunction));
- }
-
- // --------------------------------------------------------------------------------------------
-
-
- @Override
- public Class<? extends Key<?>>[] getKeyClasses() {
- return this.keyTypes;
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public boolean isCombinable() {
- return super.isCombinable() || originalFunction.getUserCodeAnnotation(Combinable.class) != null;
- }
-
- /**
- * This annotation marks reduce stubs as eligible for the usage of a combiner.
- *
- * The following code excerpt shows how to make a simple reduce stub combinable (assuming here that
- * the reducer function and combiner function do the same):
- *
- * <code>
- * \@Combinable
- * public static class CountWords extends ReduceFunction<StringValue>
- * {
- * private final IntValue theInteger = new IntValue();
- *
- * \@Override
- * public void reduce(StringValue key, Iterator<Record> records, Collector out) throws Exception
- * {
- * Record element = null;
- * int sum = 0;
- * while (records.hasNext()) {
- * element = records.next();
- * element.getField(1, this.theInteger);
- * // we could have equivalently used IntValue i = record.getField(1, IntValue.class);
- *
- * sum += this.theInteger.getValue();
- * }
- *
- * element.setField(1, this.theInteger);
- * out.collect(element);
- * }
- *
- * public void combine(StringValue key, Iterator<Record> records, Collector out) throws Exception
- * {
- * this.reduce(key, records, out);
- * }
- * }
- * </code>
- */
- @Retention(RetentionPolicy.RUNTIME)
- @Target(ElementType.TYPE)
- public static @interface Combinable {};
-
- // --------------------------------------------------------------------------------------------
-
-
- /**
- * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition).
- */
- public static class Builder {
-
- /* The required parameters */
- private final UserCodeWrapper<ReduceFunction> originalUdf;
- private final UserCodeWrapper<GroupReduceFunction<Record, Record>> udfWrapper;
- private final List<Class<? extends Key<?>>> keyClasses;
- private final List<Integer> keyColumns;
-
- /* The optional parameters */
- private Ordering secondaryOrder = null;
- private List<Operator<Record>> inputs;
- private Map<String, Operator<Record>> broadcastInputs;
- private String name = DEFAULT_NAME;
-
- /**
- * Creates a Builder with the provided {@link ReduceFunction} implementation.
- *
- * @param wrappedUdf The {@link ReduceFunction} implementation for this Reduce contract.
- */
- private Builder(UserCodeWrapper<ReduceFunction> originalUdf, UserCodeWrapper<GroupReduceFunction<Record, Record>> wrappedUdf) {
- this.originalUdf = originalUdf;
- this.udfWrapper = wrappedUdf;
- this.keyClasses = new ArrayList<Class<? extends Key<?>>>();
- this.keyColumns = new ArrayList<Integer>();
- this.inputs = new ArrayList<Operator<Record>>();
- this.broadcastInputs = new HashMap<String, Operator<Record>>();
- }
-
- /**
- * Creates a Builder with the provided {@link ReduceFunction} implementation.
- *
- * @param wrappedUdf The {@link ReduceFunction} implementation for this Reduce contract.
- * @param keyClass The class of the key data type.
- * @param keyColumn The position of the key.
- */
- private Builder(UserCodeWrapper<ReduceFunction> originalUdf,
- UserCodeWrapper<GroupReduceFunction<Record, Record>> wrappedUdf,
- Class<? extends Key<?>> keyClass, int keyColumn)
- {
- this.originalUdf = originalUdf;
- this.udfWrapper = wrappedUdf;
- this.keyClasses = new ArrayList<Class<? extends Key<?>>>();
- this.keyClasses.add(keyClass);
- this.keyColumns = new ArrayList<Integer>();
- this.keyColumns.add(keyColumn);
- this.inputs = new ArrayList<Operator<Record>>();
- this.broadcastInputs = new HashMap<String, Operator<Record>>();
- }
-
- private int[] getKeyColumnsArray() {
- int[] result = new int[keyColumns.size()];
- for (int i = 0; i < keyColumns.size(); ++i) {
- result[i] = keyColumns.get(i);
- }
- return result;
- }
-
- @SuppressWarnings("unchecked")
- private Class<? extends Key<?>>[] getKeyClassesArray() {
- return keyClasses.toArray(new Class[keyClasses.size()]);
- }
-
- /**
- * Adds additional key field.
- *
- * @param keyClass The class of the key data type.
- * @param keyColumn The position of the key.
- */
- public Builder keyField(Class<? extends Key<?>> keyClass, int keyColumn) {
- keyClasses.add(keyClass);
- keyColumns.add(keyColumn);
- return this;
- }
-
- /**
- * Sets the order of the elements within a group.
- *
- * @param order The order for the elements in a group.
- */
- public Builder secondaryOrder(Ordering order) {
- this.secondaryOrder = order;
- return this;
- }
-
- /**
- * Sets the input.
- *
- * @param input The input.
- */
- public Builder input(Operator<Record> input) {
- Preconditions.checkNotNull(input, "The input must not be null");
-
- this.inputs.clear();
- this.inputs.add(input);
- return this;
- }
-
- /**
- * Sets one or several inputs (union).
- *
- * @param inputs
- */
- public Builder input(Operator<Record>...inputs) {
- this.inputs.clear();
- Collections.addAll(this.inputs, inputs);
- return this;
- }
-
- /**
- * Sets the inputs.
- *
- * @param inputs
- */
- public Builder inputs(List<Operator<Record>> inputs) {
- this.inputs = inputs;
- return this;
- }
-
- /**
- * Binds the result produced by a plan rooted at {@code root} to a
- * variable used by the UDF wrapped in this operator.
- */
- public Builder setBroadcastVariable(String name, Operator<Record> input) {
- this.broadcastInputs.put(name, input);
- return this;
- }
-
- /**
- * Binds multiple broadcast variables.
- */
- public Builder setBroadcastVariables(Map<String, Operator<Record>> inputs) {
- this.broadcastInputs.clear();
- this.broadcastInputs.putAll(inputs);
- return this;
- }
-
- /**
- * Sets the name of this operator.
- *
- * @param name
- */
- public Builder name(String name) {
- this.name = name;
- return this;
- }
-
- /**
- * Creates and returns a ReduceOperator from using the values given
- * to the builder.
- *
- * @return The created operator
- */
- public ReduceOperator build() {
- if (name == null) {
- name = udfWrapper.getUserCodeClass().getName();
- }
- return new ReduceOperator(this);
- }
- }
-
- // ============================================================================================
-
- public static class WrappingReduceFunction extends WrappingFunction<ReduceFunction> implements GroupReduceFunction<Record, Record>, GroupCombineFunction<Record, Record> {
-
- private static final long serialVersionUID = 1L;
-
- public WrappingReduceFunction(ReduceFunction reducer) {
- super(reducer);
- }
-
- @Override
- public final void reduce(Iterable<Record> records, Collector<Record> out) throws Exception {
- this.wrappedFunction.reduce(records.iterator(), out);
- }
-
- @Override
- public final void combine(Iterable<Record> records, Collector<Record> out) throws Exception {
- this.wrappedFunction.combine(records.iterator(), out);
- }
- }
-
- public static final class WrappingClassReduceFunction extends WrappingReduceFunction {
-
- private static final long serialVersionUID = 1L;
-
- public WrappingClassReduceFunction(Class<? extends ReduceFunction> reducer) {
- super(InstantiationUtil.instantiate(reducer));
- }
-
- private void writeObject(ObjectOutputStream out) throws IOException {
- out.writeObject(wrappedFunction.getClass());
- }
-
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- Class<?> clazz = (Class<?>) in.readObject();
- this.wrappedFunction = (ReduceFunction) InstantiationUtil.instantiate(clazz);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java
deleted file mode 100644
index 3f7e7c5..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.operators.DualInputSemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsSecond;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-@SuppressWarnings({ "serial", "deprecation" })
-public class CoGroupWrappingFunctionTest {
-
- @SuppressWarnings("unchecked")
- @Test
- public void testWrappedCoGroupObject() {
- try {
- AtomicInteger methodCounter = new AtomicInteger();
-
- CoGroupOperator coGroupOp = CoGroupOperator.builder(new TestCoGroupFunction(methodCounter), LongValue.class, 1, 2).build();
-
- RichFunction cogrouper = (RichFunction) coGroupOp.getUserCodeWrapper().getUserCodeObject();
-
- // test the method invocations
- cogrouper.close();
- cogrouper.open(new Configuration());
- assertEquals(2, methodCounter.get());
-
- // prepare the coGroup
- final List<Record> target = new ArrayList<Record>();
- Collector<Record> collector = new Collector<Record>() {
- @Override
- public void collect(Record record) {
- target.add(record);
- }
- @Override
- public void close() {}
- };
-
- List<Record> source1 = new ArrayList<Record>();
- source1.add(new Record(new IntValue(42)));
- source1.add(new Record(new IntValue(13)));
-
- List<Record> source2 = new ArrayList<Record>();
- source2.add(new Record(new LongValue(11)));
- source2.add(new Record(new LongValue(17)));
-
- // test coGroup
- ((org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>) cogrouper).coGroup(source1, source2, collector);
- assertEquals(4, target.size());
- assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
- assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
- assertEquals(new LongValue(11), target.get(2).getField(0, LongValue.class));
- assertEquals(new LongValue(17), target.get(3).getField(0, LongValue.class));
- target.clear();
-
- // test the serialization
- SerializationUtils.clone((java.io.Serializable) cogrouper);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testWrappedCoGroupClass() {
- try {
- CoGroupOperator coGroupOp = CoGroupOperator.builder(TestCoGroupFunction.class, LongValue.class, 1, 2).build();
-
- UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> udf = coGroupOp.getUserCodeWrapper();
- UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> copy = SerializationUtils.clone(udf);
- org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record> cogrouper = copy.getUserCodeObject();
-
- // prepare the coGpuü
- final List<Record> target = new ArrayList<Record>();
- Collector<Record> collector = new Collector<Record>() {
- @Override
- public void collect(Record record) {
- target.add(record);
- }
- @Override
- public void close() {}
- };
-
- List<Record> source1 = new ArrayList<Record>();
- source1.add(new Record(new IntValue(42)));
- source1.add(new Record(new IntValue(13)));
-
- List<Record> source2 = new ArrayList<Record>();
- source2.add(new Record(new LongValue(11)));
- source2.add(new Record(new LongValue(17)));
-
- // test coGroup
- cogrouper.coGroup(source1, source2, collector);
- assertEquals(4, target.size());
- assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
- assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
- assertEquals(new LongValue(11), target.get(2).getField(0, LongValue.class));
- assertEquals(new LongValue(17), target.get(3).getField(0, LongValue.class));
- target.clear();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testExtractSemantics() {
- try {
- {
- CoGroupOperator coGroupOp = CoGroupOperator.builder(new TestCoGroupFunction(), LongValue.class, 1, 2).build();
-
- DualInputSemanticProperties props = coGroupOp.getSemanticProperties();
- FieldSet fw2 = props.getForwardingTargetFields(0, 2);
- FieldSet fw4 = props.getForwardingTargetFields(1, 4);
- assertNotNull(fw2);
- assertNotNull(fw4);
- assertEquals(1, fw2.size());
- assertEquals(1, fw4.size());
- assertTrue(fw2.contains(2));
- assertTrue(fw4.contains(4));
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- @ConstantFieldsFirst(2)
- @ConstantFieldsSecond(4)
- public static class TestCoGroupFunction extends CoGroupFunction {
-
- private final AtomicInteger methodCounter;
-
- private TestCoGroupFunction(AtomicInteger methodCounter) {
- this.methodCounter= methodCounter;
- }
-
- public TestCoGroupFunction() {
- methodCounter = new AtomicInteger();
- }
-
- @Override
- public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) throws Exception {
- while (records1.hasNext()) {
- out.collect(records1.next());
- }
- while (records2.hasNext()) {
- out.collect(records2.next());
- }
- }
-
- @Override
- public void close() throws Exception {
- methodCounter.incrementAndGet();
- super.close();
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- methodCounter.incrementAndGet();
- super.open(parameters);
- }
- };
-}