You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/11/26 01:20:41 UTC

[3/8] flink git commit: [FLINK-2906] Remove Record API

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/DeltaIteration.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/DeltaIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/DeltaIteration.java
deleted file mode 100644
index a31448d..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/DeltaIteration.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.operators.OperatorInformation;
-import org.apache.flink.api.common.operators.base.DeltaIterationBase;
-import org.apache.flink.api.java.typeutils.RecordTypeInfo;
-import org.apache.flink.types.Record;
-
-import java.util.Arrays;
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- * 
- * 
- * A DeltaIteration is similar to a {@link BulkIteration}, 
- * but maintains state across the individual iteration steps. The state is called the <i>solution set</i>, can be obtained
- * via {@link #getSolutionSet()}, and be accessed by joining (or CoGrouping) with it. The solution
- * set is updated by producing a delta for it, which is merged into the solution set at the end of each iteration step.
- * <p>
- * The delta iteration must be closed by setting a delta for the solution set ({@link #setSolutionSetDelta(org.apache.flink.api.common.operators.Operator)})
- * and the new workset (the data set that will be fed back, {@link #setNextWorkset(org.apache.flink.api.common.operators.Operator)}).
- * The DeltaIteration itself represents the result after the iteration has terminated.
- * Delta iterations terminate when the feed back data set (the workset) is empty.
- * In addition, a maximum number of steps is given as a fall back termination guard.
- * <p>
- * Elements in the solution set are uniquely identified by a key. When merging the solution set delta, contained elements
- * with the same key are replaced.
- * <p>
- * This class is a subclass of {@code DualInputOperator}. The solution set is considered the first input, the
- * workset is considered the second input.
- */
-@Deprecated
-public class DeltaIteration extends DeltaIterationBase<Record, Record> {
-
-	public DeltaIteration(int keyPosition) {
-		super(OperatorInfoHelper.binary(), keyPosition);
-	}
-	
-	public DeltaIteration(int[] keyPositions) {
-		super(OperatorInfoHelper.binary(), keyPositions);
-	}
-
-	public DeltaIteration(int keyPosition, String name) {
-		super(OperatorInfoHelper.binary(), keyPosition, name);
-	}
-	
-	public DeltaIteration(int[] keyPositions, String name) {
-		super(OperatorInfoHelper.binary(), keyPositions, name);
-	}
-
-	/**
-	 * Specialized operator to use as a recognizable place-holder for the working set input to the
-	 * step function.
-	 */
-	public static class WorksetPlaceHolder extends DeltaIterationBase.WorksetPlaceHolder<Record> {
-		public WorksetPlaceHolder(DeltaIterationBase<?, Record> container) {
-			super(container, new OperatorInformation<Record>(new RecordTypeInfo()));
-		}
-	}
-
-	/**
-	 * Specialized operator to use as a recognizable place-holder for the solution set input to the
-	 * step function.
-	 */
-	public static class SolutionSetPlaceHolder extends DeltaIterationBase.SolutionSetPlaceHolder<Record> {
-		public SolutionSetPlaceHolder(DeltaIterationBase<Record, ?> container) {
-			super(container, new OperatorInformation<Record>(new RecordTypeInfo()));
-		}
-
-		public void checkJoinKeyFields(int[] keyFields) {
-			int[] ssKeys = containingIteration.getSolutionSetKeyFields();
-			if (!Arrays.equals(ssKeys, keyFields)) {
-				throw new InvalidProgramException("The solution can only be joined/co-grouped with the same keys as the elements are identified with (here: " + Arrays.toString(ssKeys) + ").");
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSink.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSink.java
deleted file mode 100644
index 9b790e6..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSink.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import java.util.List;
-
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.FileDataSinkBase;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
-import org.apache.flink.api.java.typeutils.RecordTypeInfo;
-import org.apache.flink.types.Nothing;
-import org.apache.flink.types.Record;
-
-import com.google.common.base.Preconditions;
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- * 
- * 
- * Operator for nodes which act as data sinks, storing the data they receive in a file instead of sending it to another
- * contract. The encoding of the data in the file is handled by the {@link FileOutputFormat}.
- * 
- * @see FileOutputFormat
- */
-@Deprecated
-public class FileDataSink extends FileDataSinkBase<Record> {
-
-	private static String DEFAULT_NAME = "<Unnamed File Data Sink>";
-
-	/**
-	 * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation 
-	 * and the given name, writing to the file indicated by the given path.
-	 * 
-	 * @param f The {@link FileOutputFormat} implementation used to encode the data.
-	 * @param filePath The path to the file to write the contents to.
-	 * @param name The given name for the sink, used in plans, logs and progress messages.
-	 */
-	public FileDataSink(FileOutputFormat<Record> f, String filePath, String name) {
-		super(f,  new UnaryOperatorInformation<Record, Nothing>(new RecordTypeInfo(), new NothingTypeInfo()), filePath, name);
-	}
-
-	/**
-	 * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation
-	 * and a default name, writing to the file indicated by the given path.
-	 * 
-	 * @param f The {@link FileOutputFormat} implementation used to encode the data.
-	 * @param filePath The path to the file to write the contents to.
-	 */
-	public FileDataSink(FileOutputFormat<Record> f, String filePath) {
-		this(f, filePath, DEFAULT_NAME);
-	}
-	
-	
-	/**
-	 * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation the default name,
-	 * writing to the file indicated by the given path. It uses the given contract as its input.
-	 * 
-	 * @param f The {@link FileOutputFormat} implementation used to encode the data.
-	 * @param filePath The path to the file to write the contents to.
-	 * @param input The contract to use as the input.
-	 */
-	public FileDataSink(FileOutputFormat<Record> f, String filePath, Operator<Record> input) {
-		this(f, filePath);
-		setInput(input);
-
-	}
-
-	/**
-	 * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation the default name,
-	 * writing to the file indicated by the given path. It uses the given contracts as its input.
-	 *
-	 * @param f The {@link FileOutputFormat} implementation used to encode the data.
-	 * @param filePath The path to the file to write the contents to.
-	 * @param input The contracts to use as the input.
-	 * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
-	 */
-	@Deprecated
-	public FileDataSink(FileOutputFormat<Record> f, String filePath, List<Operator<Record>> input) {
-		this(f, filePath, input, DEFAULT_NAME);
-	}
-
-	/**
-	 * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation and the given name,
-	 * writing to the file indicated by the given path. It uses the given contract as its input.
-	 *
-	 * @param f The {@link FileOutputFormat} implementation used to encode the data.
-	 * @param filePath The path to the file to write the contents to.
-	 * @param input The contract to use as the input.
-	 * @param name The given name for the sink, used in plans, logs and progress messages.
-	 */
-	public FileDataSink(FileOutputFormat<Record> f, String filePath, Operator<Record> input, String name) {
-		this(f, filePath, name);
-		setInput(input);
-	}
-
-	/**
-	 * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation and the given name,
-	 * writing to the file indicated by the given path. It uses the given contracts as its input.
-	 *
-	 * @param f The {@link FileOutputFormat} implementation used to encode the data.
-	 * @param filePath The path to the file to write the contents to.
-	 * @param input The contracts to use as the input.
-	 * @param name The given name for the sink, used in plans, logs and progress messages.
-	 * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
-	 */
-	@Deprecated
-	public FileDataSink(FileOutputFormat<Record> f, String filePath, List<Operator<Record>> input, String name) {
-		this(f, filePath, name);
-		Preconditions.checkNotNull(input, "The input must not be null.");
-		setInput(Operator.createUnionCascade(input));
-	}
-
-	/**
-	 * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation
-	 * and the given name, writing to the file indicated by the given path.
-	 *
-	 * @param f The {@link FileOutputFormat} implementation used to encode the data.
-	 * @param filePath The path to the file to write the contents to.
-	 * @param name The given name for the sink, used in plans, logs and progress messages.
-	 */
-	public FileDataSink(Class<? extends FileOutputFormat<Record>> f, String filePath, String name) {
-		super(new UserCodeClassWrapper<FileOutputFormat<Record>>(f),
-				new UnaryOperatorInformation<Record, Nothing>(new RecordTypeInfo(), new NothingTypeInfo()),
-				filePath, name);
-	}
-
-	/**
-	 * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation
-	 * and a default name, writing to the file indicated by the given path.
-	 *
-	 * @param f The {@link FileOutputFormat} implementation used to encode the data.
-	 * @param filePath The path to the file to write the contents to.
-	 */
-	public FileDataSink(Class<? extends FileOutputFormat<Record>> f, String filePath) {
-		this(f, filePath, DEFAULT_NAME);
-	}
-
-	/**
-	 * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation the default name,
-	 * writing to the file indicated by the given path. It uses the given contract as its input.
-	 *
-	 * @param f The {@link FileOutputFormat} implementation used to encode the data.
-	 * @param filePath The path to the file to write the contents to.
-	 * @param input The contract to use as the input.
-	 */
-	public FileDataSink(Class<? extends FileOutputFormat<Record>> f, String filePath, Operator<Record> input) {
-		this(f, filePath, input, DEFAULT_NAME);
-	}
-
-	/**
-	 * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation the default name,
-	 * writing to the file indicated by the given path. It uses the given contracts as its input.
-	 *
-	 * @param f The {@link FileOutputFormat} implementation used to encode the data.
-	 * @param filePath The path to the file to write the contents to.
-	 * @param input The contracts to use as the input.
-	 * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
-	 */
-	@Deprecated
-	public FileDataSink(Class<? extends FileOutputFormat<Record>> f, String filePath, List<Operator<Record>> input) {
-		this(f, filePath, input, DEFAULT_NAME);
-	}
-
-	/**
-	 * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation and the given name,
-	 * writing to the file indicated by the given path. It uses the given contract as its input.
-	 *
-	 * @param f The {@link FileOutputFormat} implementation used to encode the data.
-	 * @param filePath The path to the file to write the contents to.
-	 * @param input The contract to use as the input.
-	 * @param name The given name for the sink, used in plans, logs and progress messages.
-	 */
-	public FileDataSink(Class<? extends FileOutputFormat<Record>> f, String filePath, Operator<Record> input, String name) {
-		this(f, filePath, name);
-		setInput(input);
-	}
-
-	/**
-	 * Creates a FileDataSink with the provided {@link FileOutputFormat} implementation and the given name,
-	 * writing to the file indicated by the given path. It uses the given contracts as its input.
-	 *
-	 * @param f The {@link FileOutputFormat} implementation used to encode the data.
-	 * @param filePath The path to the file to write the contents to.
-	 * @param input The contracts to use as the input.
-	 * @param name The given name for the sink, used in plans, logs and progress messages.
-	 * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
-	 */
-	@Deprecated
-	public FileDataSink(Class<? extends FileOutputFormat<Record>> f, String filePath, List<Operator<Record>> input, String name) {
-		this(f, filePath, name);
-		Preconditions.checkNotNull(input, "The inputs must not be null.");
-		setInput(Operator.createUnionCascade(input));
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Returns the configured file path where the output is written to.
-	 * 
-	 * @return The path to which the output shall be written.
-	 */
-	public String getFilePath()
-	{
-		return this.filePath;
-	}
-	
-
-	@Override
-	public String toString() {
-		return this.filePath;
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSource.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSource.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSource.java
deleted file mode 100644
index d5132b8..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSource.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.common.operators.base.FileDataSourceBase;
-import org.apache.flink.types.Record;
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- * 
- * Operator for input nodes which read data from files. (For Record data model)
- */
-@Deprecated
-public class FileDataSource extends FileDataSourceBase<Record> {
-
-	/**
-	 * Creates a new instance for the given file using the given file input format.
-	 * 
-	 * @param f The {@link FileInputFormat} implementation used to read the data.
-	 * @param filePath The file location. The file path must be a fully qualified URI, including the address schema.
-	 * @param name The given name for the Pact, used in plans, logs and progress messages.
-	 */
-	public FileDataSource(FileInputFormat<Record> f, String filePath, String name) {
-		super(f, OperatorInfoHelper.source(), filePath, name);
-	}
-
-	/**
-	 * Creates a new instance for the given file using the given input format. The contract has the default name.
-	 * 
-	 * @param f The {@link FileInputFormat} implementation used to read the data.
-	 * @param filePath The file location. The file path must be a fully qualified URI, including the address schema.
-	 */
-	public FileDataSource(FileInputFormat<Record> f, String filePath) {
-		super(f, OperatorInfoHelper.source(), filePath);
-	}
-	
-	/**
-	 * Creates a new instance for the given file using the given file input format.
-	 * 
-	 * @param f The {@link FileInputFormat} implementation used to read the data.
-	 * @param filePath The file location. The file path must be a fully qualified URI, including the address schema.
-	 * @param name The given name for the Pact, used in plans, logs and progress messages.
-	 */
-	public FileDataSource(Class<? extends FileInputFormat<Record>> f, String filePath, String name) {
-		super(f, OperatorInfoHelper.source(), filePath, name);
-	}
-
-	/**
-	 * Creates a new instance for the given file using the given input format. The contract has the default name.
-	 * 
-	 * @param f The {@link FileInputFormat} implementation used to read the data.
-	 * @param filePath The file location. The file path must be a fully qualified URI, including the address schema.
-	 */
-	public FileDataSource(Class<? extends FileInputFormat<Record>> f, String filePath) {
-		super(f, OperatorInfoHelper.source(), filePath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSink.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSink.java
deleted file mode 100644
index 22ae477..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSink.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import com.google.common.base.Preconditions;
-
-import java.util.List;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.common.operators.GenericDataSinkBase;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
-import org.apache.flink.api.java.typeutils.RecordTypeInfo;
-import org.apache.flink.types.Nothing;
-import org.apache.flink.types.Record;
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- * 
- * Operator for nodes that act as data sinks, storing the data they receive.
- * The way the data is stored is handled by the {@link OutputFormat}.
- */
-
-@Deprecated
-public class GenericDataSink extends GenericDataSinkBase<Record> {
-
-	private static String DEFAULT_NAME = "<Unnamed Generic Record Data Sink>";
-
-	/**
-	 * Creates a GenericDataSink with the provided {@link OutputFormat} implementation 
-	 * and the given name. 
-	 * 
-	 * @param f The {@link OutputFormat} implementation used to sink the data.
-	 * @param name The given name for the sink, used in plans, logs and progress messages.
-	 */
-	public GenericDataSink(OutputFormat<Record> f, String name) {
-		super(f, new UnaryOperatorInformation<Record, Nothing>(new RecordTypeInfo(), new NothingTypeInfo()), name);
-	}
-
-
-	/**
-	 * Creates a GenericDataSink with the provided {@link OutputFormat} implementation
-	 * and a default name.
-	 * 
-	 * @param f The {@link OutputFormat} implementation used to sink the data.
-	 */
-	public GenericDataSink(OutputFormat<Record> f) {
-		this(f, DEFAULT_NAME);
-	}
-	
-	/**
-	 * Creates a GenericDataSink with the provided {@link OutputFormat} implementation the default name.
-	 * It uses the given operator as its input.
-	 * 
-	 * @param f The {@link OutputFormat} implementation used to sink the data.
-	 * @param input The operator to use as the input.
-	 */
-	public GenericDataSink(OutputFormat<Record> f, Operator<Record> input) {
-		this(f, input, DEFAULT_NAME);
-	}
-	
-	/**
-	 * Creates a GenericDataSink with the provided {@link OutputFormat} implementation the default name.
-	 * It uses the given contracts as its input.
-	 * 
-	 * @param f The {@link OutputFormat} implementation used to sink the data.
-	 * @param input The contracts to use as the input.
-	 * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
-	 */
-	@Deprecated
-	public GenericDataSink(OutputFormat<Record> f, List<Operator<Record>> input) {
-		this(f, input, DEFAULT_NAME);
-	}
-
-	/**
-	 * Creates a GenericDataSink with the provided {@link OutputFormat} implementation and the given name.
-	 * It uses the given operator as its input.
-	 * 
-	 * @param f The {@link OutputFormat} implementation used to sink the data.
-	 * @param input The operator to use as the input.
-	 * @param name The given name for the sink, used in plans, logs and progress messages.
-	 */
-	public GenericDataSink(OutputFormat<Record> f, Operator<Record> input, String name) {
-		this(f, name);
-		setInput(input);
-	}
-
-	/**
-	 * Creates a GenericDataSink with the provided {@link OutputFormat} implementation and the given name.
-	 * It uses the given contracts as its input.
-	 * 
-	 * @param f The {@link OutputFormat} implementation used to sink the data.
-	 * @param input The contracts to use as the input.
-	 * @param name The given name for the sink, used in plans, logs and progress messages.
-	 * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
-	 */
-	@Deprecated
-	public GenericDataSink(OutputFormat<Record> f, List<Operator<Record>> input, String name) {
-		this(f, name);
-		setInputs(input);
-	}
-	
-	/**
-	 * Creates a GenericDataSink with the provided {@link OutputFormat} implementation 
-	 * and the given name. 
-	 * 
-	 * @param f The {@link OutputFormat} implementation used to sink the data.
-	 * @param name The given name for the sink, used in plans, logs and progress messages.
-	 */
-	public GenericDataSink(Class<? extends OutputFormat<Record>> f, String name) {
-		super(new UserCodeClassWrapper<OutputFormat<Record>>(f),
-				new UnaryOperatorInformation<Record, Nothing>(new RecordTypeInfo(), new NothingTypeInfo()), name);
-	}
-
-	
-	/**
-	 * Creates a GenericDataSink with the provided {@link OutputFormat} implementation
-	 * and a default name.
-	 * 
-	 * @param f The {@link OutputFormat} implementation used to sink the data.
-	 */
-	public GenericDataSink(Class<? extends OutputFormat<Record>> f) {
-		this(f, DEFAULT_NAME);
-	}
-	
-	/**
-	 * Creates a GenericDataSink with the provided {@link OutputFormat} implementation the default name.
-	 * It uses the given operator as its input.
-	 * 
-	 * @param f The {@link OutputFormat} implementation used to sink the data.
-	 * @param input The operator to use as the input.
-	 */
-	public GenericDataSink(Class<? extends OutputFormat<Record>> f, Operator<Record> input) {
-		this(f, input, DEFAULT_NAME);
-	}
-	
-	/**
-	 * Creates a GenericDataSink with the provided {@link OutputFormat} implementation the default name.
-	 * It uses the given contracts as its input.
-	 * 
-	 * @param f The {@link OutputFormat} implementation used to sink the data.
-	 * @param input The contracts to use as the input.
-	 * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
-	 */
-	@Deprecated
-	public GenericDataSink(Class<? extends OutputFormat<Record>> f, List<Operator<Record>> input) {
-		this(f, input, DEFAULT_NAME);
-	}
-
-	/**
-	 * Creates a GenericDataSink with the provided {@link OutputFormat} implementation and the given name.
-	 * It uses the given operator as its input.
-	 * 
-	 * @param f The {@link OutputFormat} implementation used to sink the data.
-	 * @param input The operator to use as the input.
-	 * @param name The given name for the sink, used in plans, logs and progress messages.
-	 */
-	public GenericDataSink(Class<? extends OutputFormat<Record>> f, Operator<Record> input, String name) {
-		this(f, name);
-		setInput(input);
-	}
-
-	/**
-	 * Creates a GenericDataSink with the provided {@link OutputFormat} implementation and the given name.
-	 * It uses the given contracts as its input.
-	 * 
-	 * @param f The {@link OutputFormat} implementation used to sink the data.
-	 * @param input The contracts to use as the input.
-	 * @param name The given name for the sink, used in plans, logs and progress messages.
-	 * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
-	 */
-	@Deprecated
-	public GenericDataSink(Class<? extends OutputFormat<Record>> f, List<Operator<Record>> input, String name) {
-		this(f, name);
-		setInputs(input);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Sets the input to the union of the given operators.
-	 * 
-	 * @param inputs The operator(s) that form the input.
-	 * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
-	 */
-	@Deprecated
-	public void setInputs(Operator<Record>... inputs) {
-		Preconditions.checkNotNull(inputs, "The inputs may not be null.");
-		this.input = Operator.createUnionCascade(inputs);
-	}
-	
-	/**
-	 * Sets the input to the union of the given operators.
-	 * 
-	 * @param inputs The operator(s) that form the input.
-	 * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
-	 */
-	@Deprecated
-	public void setInputs(List<Operator<Record>> inputs) {
-		Preconditions.checkNotNull(inputs, "The inputs may not be null.");
-		this.input = Operator.createUnionCascade(inputs);
-	}
-	
-	/**
-	 * Adds to the input the union of the given operators.
-	 * 
-	 * @param inputs The operator(s) to be unioned with the input.
-	 * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
-	 */
-	@Deprecated
-	public void addInput(Operator<Record>... inputs) {
-		Preconditions.checkNotNull(inputs, "The input may not be null.");
-		this.input = Operator.createUnionCascade(this.input, inputs);
-	}
-
-	/**
-	 * Adds to the input the union of the given operators.
-	 * 
-	 * @param inputs The operator(s) to be unioned with the input.
-	 * @deprecated This method will be removed in future versions. Use the {@link org.apache.flink.api.common.operators.Union} operator instead.
-	 */
-	@SuppressWarnings("unchecked")
-	@Deprecated
-	public void addInputs(List<? extends Operator<Record>> inputs) {
-		Preconditions.checkNotNull(inputs, "The inputs may not be null.");
-		this.input = createUnionCascade(this.input, (Operator<Record>[]) inputs.toArray(new Operator[inputs.size()]));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSource.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSource.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSource.java
deleted file mode 100644
index 39e7e3b..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSource.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.types.Record;
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- * 
- * Abstract superclass for data sources in a Pact plan.
- *
- * @param <T> The type of input format invoked by instances of this data source.
- */
-
-@Deprecated
-public class GenericDataSource<T extends InputFormat<Record, ?>> extends GenericDataSourceBase<Record, T> {
-
-	/**
-	 * Creates a new instance for the given file using the given input format.
-	 * 
-	 * @param format The {@link InputFormat} implementation used to read the data.
-	 * @param name The given name for the Pact, used in plans, logs and progress messages.
-	 */
-	public GenericDataSource(T format,String name) {
-		super(format, OperatorInfoHelper.source(), name);
-	}
-	
-	/**
-	 * Creates a new instance for the given file using the given input format, using the default name.
-	 * 
-	 * @param format The {@link InputFormat} implementation used to read the data.
-	 */
-	public GenericDataSource(T format) {
-		super(format, OperatorInfoHelper.source());
-	}
-	
-	/**
-	 * Creates a new instance for the given file using the given input format.
-	 * 
-	 * @param format The {@link InputFormat} implementation used to read the data.
-	 * @param name The given name for the Pact, used in plans, logs and progress messages.
-	 */
-	public GenericDataSource(Class<? extends T> format, String name) {
-		super(format, OperatorInfoHelper.source(), name);
-	}
-	
-	/**
-	 * Creates a new instance for the given file using the given input format, using the default name.
-	 * 
-	 * @param format The {@link InputFormat} implementation used to read the data.
-	 */
-	public GenericDataSource(Class<? extends T> format) {
-		super(format, OperatorInfoHelper.source());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java
deleted file mode 100644
index 75744fe..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.RecordOperator;
-import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-
-import com.google.common.base.Preconditions;
-
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- * 
- * 
- * JoinOperator that applies a {@link JoinFunction} to each pair of records from both inputs
- * that have matching keys.
- */
-
-@Deprecated
-public class JoinOperator extends InnerJoinOperatorBase<Record, Record, Record, JoinFunction> implements RecordOperator {
-	
-	/**
-	 * The types of the keys that the operator operates on.
-	 */
-	private final Class<? extends Key<?>>[] keyTypes;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Creates a Builder with the provided {@link JoinFunction} implementation
-	 * 
-	 * @param udf The {@link JoinFunction} implementation for this join.
-	 * @param keyClass The class of the key data type.
-	 * @param keyColumn1 The position of the key in the first input's records.
-	 * @param keyColumn2 The position of the key in the second input's records.
-	 */
-	public static Builder builder(JoinFunction udf, Class<? extends Key<?>> keyClass, int keyColumn1, int keyColumn2) {
-		return new Builder(new UserCodeObjectWrapper<JoinFunction>(udf), keyClass, keyColumn1, keyColumn2);
-	}
-	
-	/**
-	 * Creates a Builder with the provided {@link JoinFunction} implementation
-	 * 
-	 * @param udf The {@link JoinFunction} implementation for this Match operator.
-	 * @param keyClass The class of the key data type.
-	 * @param keyColumn1 The position of the key in the first input's records.
-	 * @param keyColumn2 The position of the key in the second input's records.
-	 */
-	public static Builder builder(Class<? extends JoinFunction> udf, Class<? extends Key<?>> keyClass, int keyColumn1, int keyColumn2) {
-		return new Builder(new UserCodeClassWrapper<JoinFunction>(udf), keyClass, keyColumn1, keyColumn2);
-	}
-	
-	/**
-	 * The private constructor that only gets invoked from the Builder.
-	 * @param builder
-	 */
-	protected JoinOperator(Builder builder) {
-		super(builder.udf, OperatorInfoHelper.binary(), builder.getKeyColumnsArray1(), builder.getKeyColumnsArray2(), builder.name);
-		this.keyTypes = builder.getKeyClassesArray();
-		
-		if (builder.inputs1 != null && !builder.inputs1.isEmpty()) {
-			setFirstInput(Operator.createUnionCascade(builder.inputs1));
-		}
-		if (builder.inputs2 != null && !builder.inputs2.isEmpty()) {
-			setSecondInput(Operator.createUnionCascade(builder.inputs2));
-		}
-
-		// sanity check solution set key mismatches
-		if (input1 instanceof DeltaIteration.SolutionSetPlaceHolder) {
-			int[] positions = getKeyColumns(0);
-			((DeltaIteration.SolutionSetPlaceHolder) input1).checkJoinKeyFields(positions);
-		}
-		if (input2 instanceof DeltaIteration.SolutionSetPlaceHolder) {
-			int[] positions = getKeyColumns(1);
-			((DeltaIteration.SolutionSetPlaceHolder) input2).checkJoinKeyFields(positions);
-		}
-		
-		setBroadcastVariables(builder.broadcastInputs);
-		setSemanticProperties(FunctionAnnotation.readDualConstantAnnotations(builder.udf));
-	}
-	
-	@Override
-	public Class<? extends Key<?>>[] getKeyClasses() {
-		return this.keyTypes;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-		
-	/**
-	 * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition).
-	 */
-	public static class Builder {
-		
-		/* The required parameters */
-		private final UserCodeWrapper<JoinFunction> udf;
-		private final List<Class<? extends Key<?>>> keyClasses;
-		private final List<Integer> keyColumns1;
-		private final List<Integer> keyColumns2;
-		
-		/* The optional parameters */
-		private List<Operator<Record>> inputs1;
-		private List<Operator<Record>> inputs2;
-		private Map<String, Operator<Record>> broadcastInputs;
-		private String name;
-		
-		
-		/**
-		 * Creates a Builder with the provided {@link JoinFunction} implementation
-		 * 
-		 * @param udf The {@link JoinFunction} implementation for this Match operator.
-		 * @param keyClass The class of the key data type.
-		 * @param keyColumn1 The position of the key in the first input's records.
-		 * @param keyColumn2 The position of the key in the second input's records.
-		 */
-		protected Builder(UserCodeWrapper<JoinFunction> udf, Class<? extends Key<?>> keyClass, int keyColumn1, int keyColumn2) {
-			this.udf = udf;
-			this.keyClasses = new ArrayList<Class<? extends Key<?>>>();
-			this.keyClasses.add(keyClass);
-			this.keyColumns1 = new ArrayList<Integer>();
-			this.keyColumns1.add(keyColumn1);
-			this.keyColumns2 = new ArrayList<Integer>();
-			this.keyColumns2.add(keyColumn2);
-			this.inputs1 = new ArrayList<Operator<Record>>();
-			this.inputs2 = new ArrayList<Operator<Record>>();
-			this.broadcastInputs = new HashMap<String, Operator<Record>>();
-		}
-		
-		/**
-		 * Creates a Builder with the provided {@link JoinFunction} implementation. This method is intended 
-		 * for special case sub-types only.
-		 * 
-		 * @param udf The {@link JoinFunction} implementation for this Match operator.
-		 */
-		protected Builder(UserCodeWrapper<JoinFunction> udf) {
-			this.udf = udf;
-			this.keyClasses = new ArrayList<Class<? extends Key<?>>>();
-			this.keyColumns1 = new ArrayList<Integer>();
-			this.keyColumns2 = new ArrayList<Integer>();
-			this.inputs1 = new ArrayList<Operator<Record>>();
-			this.inputs2 = new ArrayList<Operator<Record>>();
-			this.broadcastInputs = new HashMap<String, Operator<Record>>();
-		}
-		
-		private int[] getKeyColumnsArray1() {
-			int[] result = new int[keyColumns1.size()];
-			for (int i = 0; i < keyColumns1.size(); ++i) {
-				result[i] = keyColumns1.get(i);
-			}
-			return result;
-		}
-		
-		private int[] getKeyColumnsArray2() {
-			int[] result = new int[keyColumns2.size()];
-			for (int i = 0; i < keyColumns2.size(); ++i) {
-				result[i] = keyColumns2.get(i);
-			}
-			return result;
-		}
-		
-		@SuppressWarnings("unchecked")
-		private Class<? extends Key<?>>[] getKeyClassesArray() {
-			return keyClasses.toArray(new Class[keyClasses.size()]);
-		}
-
-		/**
-		 * Adds additional key field.
-		 * 
-		 * @param keyClass The class of the key data type.
-		 * @param keyColumn1 The position of the key in the first input's records.
-		 * @param keyColumn2 The position of the key in the second input's records.
-		 */
-		public Builder keyField(Class<? extends Key<?>> keyClass, int keyColumn1, int keyColumn2) {
-			keyClasses.add(keyClass);
-			keyColumns1.add(keyColumn1);
-			keyColumns2.add(keyColumn2);
-			return this;
-		}
-		
-		/**
-		 * Sets the first input.
-		 * 
-		 * @param input The first input.
-		 */
-		public Builder input1(Operator<Record> input) {
-			Preconditions.checkNotNull(input, "The input must not be null");
-			
-			this.inputs1.clear();
-			this.inputs1.add(input);
-			return this;
-		}
-		
-		/**
-		 * Sets the second input.
-		 * 
-		 * @param input The second input.
-		 */
-		public Builder input2(Operator<Record> input) {
-			Preconditions.checkNotNull(input, "The input must not be null");
-			
-			this.inputs2.clear();
-			this.inputs2.add(input);
-			return this;
-		}
-		
-		/**
-		 * Sets one or several inputs (union) for input 1.
-		 * 
-		 * @param inputs
-		 */
-		public Builder input1(Operator<Record>...inputs) {
-			this.inputs1.clear();
-			for (Operator<Record> c : inputs) {
-				this.inputs1.add(c);
-			}
-			return this;
-		}
-		
-		/**
-		 * Sets one or several inputs (union) for input 2.
-		 * 
-		 * @param inputs
-		 */
-		public Builder input2(Operator<Record>...inputs) {
-			this.inputs2.clear();
-			for (Operator<Record> c : inputs) {
-				this.inputs2.add(c);
-			}
-			return this;
-		}
-		
-		/**
-		 * Sets the first inputs.
-		 * 
-		 * @param inputs
-		 */
-		public Builder inputs1(List<Operator<Record>> inputs) {
-			this.inputs1 = inputs;
-			return this;
-		}
-		
-		/**
-		 * Sets the second inputs.
-		 * 
-		 * @param inputs
-		 */
-		public Builder inputs2(List<Operator<Record>> inputs) {
-			this.inputs2 = inputs;
-			return this;
-		}
-		
-		/**
-		 * Binds the result produced by a plan rooted at {@code root} to a 
-		 * variable used by the UDF wrapped in this operator.
-		 */
-		public Builder setBroadcastVariable(String name, Operator<Record> input) {
-			this.broadcastInputs.put(name, input);
-			return this;
-		}
-		
-		/**
-		 * Binds multiple broadcast variables.
-		 */
-		public Builder setBroadcastVariables(Map<String, Operator<Record>> inputs) {
-			this.broadcastInputs.clear();
-			this.broadcastInputs.putAll(inputs);
-			return this;
-		}
-		
-		/**
-		 * Sets the name of this operator.
-		 */
-		public Builder name(String name) {
-			this.name = name;
-			return this;
-		}
-		
-		/**
-		 * Creates and returns a JoinOperator from using the values given 
-		 * to the builder.
-		 * 
-		 * @return The created operator
-		 */
-		public JoinOperator build() {
-			if (keyClasses.size() <= 0) {
-				throw new IllegalStateException("At least one key attribute has to be set.");
-			}
-			if (name == null) {
-				name = udf.getUserCodeClass().getName();
-			}
-			return new JoinOperator(this);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java
deleted file mode 100644
index ed4175b..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.RecordOperator;
-import org.apache.flink.api.common.operators.base.CollectorMapOperatorBase;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-
-import com.google.common.base.Preconditions;
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- * 
- * 
- * MapOperator that applies a {@link MapFunction} to each record independently.
- * 
- * @see MapFunction
- */
-
-@Deprecated
-public class MapOperator extends CollectorMapOperatorBase<Record, Record, MapFunction> implements RecordOperator {
-	
-	private static String DEFAULT_NAME = "<Unnamed Mapper>";
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Creates a Builder with the provided {@link MapFunction} implementation.
-	 * 
-	 * @param udf The {@link MapFunction} implementation for this Map operator.
-	 */
-	public static Builder builder(MapFunction udf) {
-		return new Builder(new UserCodeObjectWrapper<MapFunction>(udf));
-	}
-	
-	/**
-	 * Creates a Builder with the provided {@link MapFunction} implementation.
-	 * 
-	 * @param udf The {@link MapFunction} implementation for this Map operator.
-	 */
-	public static Builder builder(Class<? extends MapFunction> udf) {
-		return new Builder(new UserCodeClassWrapper<MapFunction>(udf));
-	}
-	
-	/**
-	 * The private constructor that only gets invoked from the Builder.
-	 * @param builder
-	 */
-	protected MapOperator(Builder builder) {
-		super(builder.udf, OperatorInfoHelper.unary(), builder.name);
-		
-		if (builder.inputs != null && !builder.inputs.isEmpty()) {
-			setInput(Operator.createUnionCascade(builder.inputs));
-		}
-		
-		setBroadcastVariables(builder.broadcastInputs);
-		setSemanticProperties(FunctionAnnotation.readSingleConstantAnnotations(builder.udf));
-	}
-	
-
-	@Override
-	public Class<? extends Key<?>>[] getKeyClasses() {
-		return emptyClassArray();
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition).
-	 */
-	public static class Builder {
-		
-		/* The required parameters */
-		private final UserCodeWrapper<MapFunction> udf;
-		
-		/* The optional parameters */
-		private List<Operator<Record>> inputs;
-		private Map<String, Operator<Record>> broadcastInputs;
-		private String name = DEFAULT_NAME;
-		
-		/**
-		 * Creates a Builder with the provided {@link MapFunction} implementation.
-		 * 
-		 * @param udf The {@link MapFunction} implementation for this Map operator.
-		 */
-		private Builder(UserCodeWrapper<MapFunction> udf) {
-			this.udf = udf;
-			this.inputs = new ArrayList<Operator<Record>>();
-			this.broadcastInputs = new HashMap<String, Operator<Record>>();
-		}
-		
-		/**
-		 * Sets the input.
-		 * 
-		 * @param input The input.
-		 */
-		public Builder input(Operator<Record> input) {
-			Preconditions.checkNotNull(input, "The input must not be null");
-			
-			this.inputs.clear();
-			this.inputs.add(input);
-			return this;
-		}
-		
-		/**
-		 * Sets one or several inputs (union).
-		 * 
-		 * @param inputs
-		 */
-		public Builder input(Operator<Record>...inputs) {
-			this.inputs.clear();
-			for (Operator<Record> c : inputs) {
-				this.inputs.add(c);
-			}
-			return this;
-		}
-		
-		/**
-		 * Sets the inputs.
-		 * 
-		 * @param inputs
-		 */
-		public Builder inputs(List<Operator<Record>> inputs) {
-			this.inputs = inputs;
-			return this;
-		}
-		
-		/**
-		 * Binds the result produced by a plan rooted at {@code root} to a 
-		 * variable used by the UDF wrapped in this operator.
-		 */
-		public Builder setBroadcastVariable(String name, Operator<Record> input) {
-			this.broadcastInputs.put(name, input);
-			return this;
-		}
-		
-		/**
-		 * Binds multiple broadcast variables.
-		 */
-		public Builder setBroadcastVariables(Map<String, Operator<Record>> inputs) {
-			this.broadcastInputs.clear();
-			this.broadcastInputs.putAll(inputs);
-			return this;
-		}
-		
-		/**
-		 * Sets the name of this operator.
-		 * 
-		 * @param name
-		 */
-		public Builder name(String name) {
-			this.name = name;
-			return this;
-		}
-		
-		/**
-		 * Creates and returns a MapOperator from using the values given 
-		 * to the builder.
-		 * 
-		 * @return The created operator
-		 */
-		public MapOperator build() {
-			if (name == null) {
-				name = udf.getUserCodeClass().getName();
-			}
-			return new MapOperator(this);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapPartitionOperator.java
deleted file mode 100644
index 235e5ad..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapPartitionOperator.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.RecordOperator;
-import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation;
-import org.apache.flink.api.java.record.functions.MapPartitionFunction;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-
-import com.google.common.base.Preconditions;
-
-/**
- * 
- *  <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- * MapPartitionOperator that applies a {@link MapPartitionFunction} to each record independently.
- * 
- * @see MapPartitionFunction
- */
-
-@Deprecated
-public class MapPartitionOperator extends MapPartitionOperatorBase<Record, Record, MapPartitionFunction> implements RecordOperator {
-	
-	private static String DEFAULT_NAME = "<Unnamed MapPartition>";
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
-	 * 
-	 * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
-	 */
-	public static Builder builder(MapPartitionFunction udf) {
-		return new Builder(new UserCodeObjectWrapper<MapPartitionFunction>(udf));
-	}
-	
-	/**
-	 * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
-	 * 
-	 * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
-	 */
-	public static Builder builder(Class<? extends MapPartitionFunction> udf) {
-		return new Builder(new UserCodeClassWrapper<MapPartitionFunction>(udf));
-	}
-	
-	/**
-	 * The private constructor that only gets invoked from the Builder.
-	 * @param builder
-	 */
-	protected MapPartitionOperator(Builder builder) {
-
-		super(builder.udf, OperatorInfoHelper.unary(), builder.name);
-		
-		if (builder.inputs != null && !builder.inputs.isEmpty()) {
-			setInput(Operator.createUnionCascade(builder.inputs));
-		}
-		
-		setBroadcastVariables(builder.broadcastInputs);
-		setSemanticProperties(FunctionAnnotation.readSingleConstantAnnotations(builder.udf));
-	}
-	
-
-	@Override
-	public Class<? extends Key<?>>[] getKeyClasses() {
-		return emptyClassArray();
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition).
-	 */
-	public static class Builder {
-		
-		/* The required parameters */
-		private final UserCodeWrapper<MapPartitionFunction> udf;
-		
-		/* The optional parameters */
-		private List<Operator<Record>> inputs;
-		private Map<String, Operator<Record>> broadcastInputs;
-		private String name = DEFAULT_NAME;
-		
-		/**
-		 * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
-		 * 
-		 * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
-		 */
-		private Builder(UserCodeWrapper<MapPartitionFunction> udf) {
-			this.udf = udf;
-			this.inputs = new ArrayList<Operator<Record>>();
-			this.broadcastInputs = new HashMap<String, Operator<Record>>();
-		}
-		
-		/**
-		 * Sets the input.
-		 * 
-		 * @param input The input.
-		 */
-		public Builder input(Operator<Record> input) {
-			Preconditions.checkNotNull(input, "The input must not be null");
-			
-			this.inputs.clear();
-			this.inputs.add(input);
-			return this;
-		}
-		
-		/**
-		 * Sets one or several inputs (union).
-		 * 
-		 * @param inputs
-		 */
-		public Builder input(Operator<Record>...inputs) {
-			this.inputs.clear();
-			for (Operator<Record> c : inputs) {
-				this.inputs.add(c);
-			}
-			return this;
-		}
-		
-		/**
-		 * Sets the inputs.
-		 * 
-		 * @param inputs
-		 */
-		public Builder inputs(List<Operator<Record>> inputs) {
-			this.inputs = inputs;
-			return this;
-		}
-		
-		/**
-		 * Binds the result produced by a plan rooted at {@code root} to a 
-		 * variable used by the UDF wrapped in this operator.
-		 */
-		public Builder setBroadcastVariable(String name, Operator<Record> input) {
-			this.broadcastInputs.put(name, input);
-			return this;
-		}
-		
-		/**
-		 * Binds multiple broadcast variables.
-		 */
-		public Builder setBroadcastVariables(Map<String, Operator<Record>> inputs) {
-			this.broadcastInputs.clear();
-			this.broadcastInputs.putAll(inputs);
-			return this;
-		}
-		
-		/**
-		 * Sets the name of this operator.
-		 * 
-		 * @param name
-		 */
-		public Builder name(String name) {
-			this.name = name;
-			return this;
-		}
-		
-		/**
-		 * Creates and returns a MapOperator from using the values given 
-		 * to the builder.
-		 * 
-		 * @return The created operator
-		 */
-		public MapPartitionOperator build() {
-			if (name == null) {
-				name = udf.getUserCodeClass().getName();
-			}
-			return new MapPartitionOperator(this);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/OperatorInfoHelper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/OperatorInfoHelper.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/OperatorInfoHelper.java
deleted file mode 100644
index 7b2bb5e..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/OperatorInfoHelper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-
-import org.apache.flink.api.common.operators.BinaryOperatorInformation;
-import org.apache.flink.api.common.operators.OperatorInformation;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.java.typeutils.RecordTypeInfo;
-import org.apache.flink.types.Record;
-
-/**
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- * 
- */
-@Deprecated
-public class OperatorInfoHelper {
-
-	public static OperatorInformation<Record> source() {
-		return new OperatorInformation<Record>(new RecordTypeInfo());
-	}
-
-	public static UnaryOperatorInformation<Record, Record> unary() {
-		return new UnaryOperatorInformation<Record, Record>(new RecordTypeInfo(), new RecordTypeInfo());
-	}
-
-	public static BinaryOperatorInformation<Record, Record, Record> binary() {
-		return new BinaryOperatorInformation<Record, Record, Record>(new RecordTypeInfo(), new RecordTypeInfo(), new RecordTypeInfo());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
deleted file mode 100644
index 05905cb..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.operators;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.RecordOperator;
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.java.operators.translation.WrappingFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-
-import com.google.common.base.Preconditions;
-
-/**
- * ReduceOperator evaluating a {@link ReduceFunction} over each group of records that share the same key.
- * 
- * @see ReduceFunction
- */
-@SuppressWarnings("deprecation")
-public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, GroupReduceFunction<Record, Record>> implements RecordOperator {
-	
-	private static final String DEFAULT_NAME = "<Unnamed Reducer>";		// the default name for contracts
-	
-	/**
-	 * The types of the keys that the contract operates on.
-	 */
-	private final Class<? extends Key<?>>[] keyTypes;
-	
-	private final UserCodeWrapper<ReduceFunction> originalFunction;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Creates a Builder with the provided {@link ReduceFunction} implementation.
-	 * 
-	 * @param udf The {@link ReduceFunction} implementation for this Reduce contract.
-	 */
-	public static Builder builder(ReduceFunction udf) {
-		UserCodeWrapper<ReduceFunction> original = new UserCodeObjectWrapper<ReduceFunction>(udf);
-		UserCodeWrapper<GroupReduceFunction<Record, Record>> wrapped =
-				new UserCodeObjectWrapper<GroupReduceFunction<Record, Record>>(new WrappingReduceFunction(udf));
-		
-		return new Builder(original, wrapped);
-	}
-	
-	/**
-	 * Creates a Builder with the provided {@link ReduceFunction} implementation.
-	 * 
-	 * @param udf The {@link ReduceFunction} implementation for this Reduce contract.
-	 * @param keyClass The class of the key data type.
-	 * @param keyColumn The position of the key.
-	 */
-	public static Builder builder(ReduceFunction udf, Class<? extends Key<?>> keyClass, int keyColumn) {
-		UserCodeWrapper<ReduceFunction> original = new UserCodeObjectWrapper<ReduceFunction>(udf);
-		UserCodeWrapper<GroupReduceFunction<Record, Record>> wrapped =
-				new UserCodeObjectWrapper<GroupReduceFunction<Record, Record>>(new WrappingReduceFunction(udf));
-		
-		return new Builder(original, wrapped, keyClass, keyColumn);
-	}
-
-	/**
-	 * Creates a Builder with the provided {@link ReduceFunction} implementation.
-	 * 
-	 * @param udf The {@link ReduceFunction} implementation for this Reduce contract.
-	 */
-	public static Builder builder(Class<? extends ReduceFunction> udf) {
-		UserCodeWrapper<ReduceFunction> original = new UserCodeClassWrapper<ReduceFunction>(udf);
-		UserCodeWrapper<GroupReduceFunction<Record, Record>> wrapped =
-				new UserCodeObjectWrapper<GroupReduceFunction<Record, Record>>(new WrappingClassReduceFunction(udf));
-		
-		return new Builder(original, wrapped);
-	}
-	
-	/**
-	 * Creates a Builder with the provided {@link ReduceFunction} implementation.
-	 * 
-	 * @param udf The {@link ReduceFunction} implementation for this Reduce contract.
-	 * @param keyClass The class of the key data type.
-	 * @param keyColumn The position of the key.
-	 */
-	public static Builder builder(Class<? extends ReduceFunction> udf, Class<? extends Key<?>> keyClass, int keyColumn) {
-		UserCodeWrapper<ReduceFunction> original = new UserCodeClassWrapper<ReduceFunction>(udf);
-		UserCodeWrapper<GroupReduceFunction<Record, Record>> wrapped =
-				new UserCodeObjectWrapper<GroupReduceFunction<Record, Record>>(new WrappingClassReduceFunction(udf));
-		
-		return new Builder(original, wrapped, keyClass, keyColumn);
-	}
-	
-	/**
-	 * The private constructor that only gets invoked from the Builder.
-	 * @param builder
-	 */
-	protected ReduceOperator(Builder builder) {
-		super(builder.udfWrapper, OperatorInfoHelper.unary(), builder.getKeyColumnsArray(), builder.name);
-		
-		this.keyTypes = builder.getKeyClassesArray();
-		this.originalFunction = builder.originalUdf;
-		
-		if (builder.inputs != null && !builder.inputs.isEmpty()) {
-			setInput(Operator.createUnionCascade(builder.inputs));
-		}
-		
-		setGroupOrder(builder.secondaryOrder);
-		setBroadcastVariables(builder.broadcastInputs);
-		
-		setSemanticProperties(FunctionAnnotation.readSingleConstantAnnotations(originalFunction));
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-
-	@Override
-	public Class<? extends Key<?>>[] getKeyClasses() {
-		return this.keyTypes;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public boolean isCombinable() {
-		return super.isCombinable() || originalFunction.getUserCodeAnnotation(Combinable.class) != null;
-	}
-	
-	/**
-	 * This annotation marks reduce stubs as eligible for the usage of a combiner.
-	 * 
-	 * The following code excerpt shows how to make a simple reduce stub combinable (assuming here that
-	 * the reducer function and combiner function do the same):
-	 * 
-	 * <code>
-	 * \@Combinable
-	 * public static class CountWords extends ReduceFunction&lt;StringValue&gt;
-	 * {
-	 *     private final IntValue theInteger = new IntValue();
-	 * 
-	 *     \@Override
-	 *     public void reduce(StringValue key, Iterator&lt;Record&gt; records, Collector out) throws Exception
-	 *     {
-	 *         Record element = null;
-	 *         int sum = 0;
-	 *         while (records.hasNext()) {
-	 *             element = records.next();
-	 *             element.getField(1, this.theInteger);
-	 *             // we could have equivalently used IntValue i = record.getField(1, IntValue.class);
-	 *          
-	 *             sum += this.theInteger.getValue();
-	 *         }
-	 *      
-	 *         element.setField(1, this.theInteger);
-	 *         out.collect(element);
-	 *     }
-	 *     
-	 *     public void combine(StringValue key, Iterator&lt;Record&gt; records, Collector out) throws Exception
-	 *     {
-	 *         this.reduce(key, records, out);
-	 *     }
-	 * }
-	 * </code>
-	 */
-	@Retention(RetentionPolicy.RUNTIME)
-	@Target(ElementType.TYPE)
-	public static @interface Combinable {};
-	
-	// --------------------------------------------------------------------------------------------
-	
-
-	/**
-	 * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition).
-	 */
-	public static class Builder {
-		
-		/* The required parameters */
-		private final UserCodeWrapper<ReduceFunction> originalUdf;
-		private final UserCodeWrapper<GroupReduceFunction<Record, Record>> udfWrapper;
-		private final List<Class<? extends Key<?>>> keyClasses;
-		private final List<Integer> keyColumns;
-		
-		/* The optional parameters */
-		private Ordering secondaryOrder = null;
-		private List<Operator<Record>> inputs;
-		private Map<String, Operator<Record>> broadcastInputs;
-		private String name = DEFAULT_NAME;
-		
-		/**
-		 * Creates a Builder with the provided {@link ReduceFunction} implementation.
-		 * 
-		 * @param wrappedUdf The {@link ReduceFunction} implementation for this Reduce contract.
-		 */
-		private Builder(UserCodeWrapper<ReduceFunction> originalUdf, UserCodeWrapper<GroupReduceFunction<Record, Record>> wrappedUdf) {
-			this.originalUdf = originalUdf;
-			this.udfWrapper = wrappedUdf;
-			this.keyClasses = new ArrayList<Class<? extends Key<?>>>();
-			this.keyColumns = new ArrayList<Integer>();
-			this.inputs = new ArrayList<Operator<Record>>();
-			this.broadcastInputs = new HashMap<String, Operator<Record>>();
-		}
-		
-		/**
-		 * Creates a Builder with the provided {@link ReduceFunction} implementation.
-		 * 
-		 * @param wrappedUdf The {@link ReduceFunction} implementation for this Reduce contract.
-		 * @param keyClass The class of the key data type.
-		 * @param keyColumn The position of the key.
-		 */
-		private Builder(UserCodeWrapper<ReduceFunction> originalUdf, 
-						UserCodeWrapper<GroupReduceFunction<Record, Record>> wrappedUdf,
-						Class<? extends Key<?>> keyClass, int keyColumn)
-		{
-			this.originalUdf = originalUdf;
-			this.udfWrapper = wrappedUdf;
-			this.keyClasses = new ArrayList<Class<? extends Key<?>>>();
-			this.keyClasses.add(keyClass);
-			this.keyColumns = new ArrayList<Integer>();
-			this.keyColumns.add(keyColumn);
-			this.inputs = new ArrayList<Operator<Record>>();
-			this.broadcastInputs = new HashMap<String, Operator<Record>>();
-		}
-		
-		private int[] getKeyColumnsArray() {
-			int[] result = new int[keyColumns.size()];
-			for (int i = 0; i < keyColumns.size(); ++i) {
-				result[i] = keyColumns.get(i);
-			}
-			return result;
-		}
-		
-		@SuppressWarnings("unchecked")
-		private Class<? extends Key<?>>[] getKeyClassesArray() {
-			return keyClasses.toArray(new Class[keyClasses.size()]);
-		}
-
-		/**
-		 * Adds additional key field.
-		 * 
-		 * @param keyClass The class of the key data type.
-		 * @param keyColumn The position of the key.
-		 */
-		public Builder keyField(Class<? extends Key<?>> keyClass, int keyColumn) {
-			keyClasses.add(keyClass);
-			keyColumns.add(keyColumn);
-			return this;
-		}
-		
-		/**
-		 * Sets the order of the elements within a group.
-		 * 
-		 * @param order The order for the elements in a group.
-		 */
-		public Builder secondaryOrder(Ordering order) {
-			this.secondaryOrder = order;
-			return this;
-		}
-		
-		/**
-		 * Sets the input.
-		 * 
-		 * @param input The input.
-		 */
-		public Builder input(Operator<Record> input) {
-			Preconditions.checkNotNull(input, "The input must not be null");
-			
-			this.inputs.clear();
-			this.inputs.add(input);
-			return this;
-		}
-		
-		/**
-		 * Sets one or several inputs (union).
-		 * 
-		 * @param inputs
-		 */
-		public Builder input(Operator<Record>...inputs) {
-			this.inputs.clear();
-			Collections.addAll(this.inputs, inputs);
-			return this;
-		}
-		
-		/**
-		 * Sets the inputs.
-		 * 
-		 * @param inputs
-		 */
-		public Builder inputs(List<Operator<Record>> inputs) {
-			this.inputs = inputs;
-			return this;
-		}
-		
-		/**
-		 * Binds the result produced by a plan rooted at {@code root} to a 
-		 * variable used by the UDF wrapped in this operator.
-		 */
-		public Builder setBroadcastVariable(String name, Operator<Record> input) {
-			this.broadcastInputs.put(name, input);
-			return this;
-		}
-		
-		/**
-		 * Binds multiple broadcast variables.
-		 */
-		public Builder setBroadcastVariables(Map<String, Operator<Record>> inputs) {
-			this.broadcastInputs.clear();
-			this.broadcastInputs.putAll(inputs);
-			return this;
-		}
-		
-		/**
-		 * Sets the name of this operator.
-		 * 
-		 * @param name
-		 */
-		public Builder name(String name) {
-			this.name = name;
-			return this;
-		}
-		
-		/**
-		 * Creates and returns a ReduceOperator from using the values given 
-		 * to the builder.
-		 * 
-		 * @return The created operator
-		 */
-		public ReduceOperator build() {
-			if (name == null) {
-				name = udfWrapper.getUserCodeClass().getName();
-			}
-			return new ReduceOperator(this);
-		}
-	}
-	
-	// ============================================================================================
-	
-	public static class WrappingReduceFunction extends WrappingFunction<ReduceFunction> implements GroupReduceFunction<Record, Record>, GroupCombineFunction<Record, Record> {
-		
-		private static final long serialVersionUID = 1L;
-		
-		public WrappingReduceFunction(ReduceFunction reducer) {
-			super(reducer);
-		}
-		
-		@Override
-		public final void reduce(Iterable<Record> records, Collector<Record> out) throws Exception {
-			this.wrappedFunction.reduce(records.iterator(), out);
-		}
-
-		@Override
-		public final void combine(Iterable<Record> records, Collector<Record> out) throws Exception {
-			this.wrappedFunction.combine(records.iterator(), out);
-		}
-	}
-	
-	public static final class WrappingClassReduceFunction extends WrappingReduceFunction {
-		
-		private static final long serialVersionUID = 1L;
-		
-		public WrappingClassReduceFunction(Class<? extends ReduceFunction> reducer) {
-			super(InstantiationUtil.instantiate(reducer));
-		}
-		
-		private void writeObject(ObjectOutputStream out) throws IOException {
-			out.writeObject(wrappedFunction.getClass());
-		}
-
-		private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-			Class<?> clazz = (Class<?>) in.readObject();
-			this.wrappedFunction = (ReduceFunction) InstantiationUtil.instantiate(clazz);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java
deleted file mode 100644
index 3f7e7c5..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.operators.DualInputSemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsSecond;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-@SuppressWarnings({ "serial", "deprecation" })
-public class CoGroupWrappingFunctionTest {
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testWrappedCoGroupObject() {
-		try {
-			AtomicInteger methodCounter = new AtomicInteger();
-			
-			CoGroupOperator coGroupOp = CoGroupOperator.builder(new TestCoGroupFunction(methodCounter), LongValue.class, 1, 2).build();
-			
-			RichFunction cogrouper = (RichFunction) coGroupOp.getUserCodeWrapper().getUserCodeObject();
-			
-			// test the method invocations
-			cogrouper.close();
-			cogrouper.open(new Configuration());
-			assertEquals(2, methodCounter.get());
-			
-			// prepare the coGroup
-			final List<Record> target = new ArrayList<Record>();
-			Collector<Record> collector = new Collector<Record>() {
-				@Override
-				public void collect(Record record) {
-					target.add(record);
-				}
-				@Override
-				public void close() {}
-			};
-			
-			List<Record> source1 = new ArrayList<Record>();
-			source1.add(new Record(new IntValue(42)));
-			source1.add(new Record(new IntValue(13)));
-			
-			List<Record> source2 = new ArrayList<Record>();
-			source2.add(new Record(new LongValue(11)));
-			source2.add(new Record(new LongValue(17)));
-			
-			// test coGroup
-			((org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>) cogrouper).coGroup(source1, source2, collector);
-			assertEquals(4, target.size());
-			assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
-			assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
-			assertEquals(new LongValue(11), target.get(2).getField(0, LongValue.class));
-			assertEquals(new LongValue(17), target.get(3).getField(0, LongValue.class));
-			target.clear();
-			
-			// test the serialization
-			SerializationUtils.clone((java.io.Serializable) cogrouper);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testWrappedCoGroupClass() {
-		try {
-			CoGroupOperator coGroupOp = CoGroupOperator.builder(TestCoGroupFunction.class, LongValue.class, 1, 2).build();
-			
-			UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> udf = coGroupOp.getUserCodeWrapper();
-			UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> copy = SerializationUtils.clone(udf);
-			org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record> cogrouper = copy.getUserCodeObject();
-			
-			// prepare the coGpuü
-			final List<Record> target = new ArrayList<Record>();
-			Collector<Record> collector = new Collector<Record>() {
-				@Override
-				public void collect(Record record) {
-					target.add(record);
-				}
-				@Override
-				public void close() {}
-			};
-			
-			List<Record> source1 = new ArrayList<Record>();
-			source1.add(new Record(new IntValue(42)));
-			source1.add(new Record(new IntValue(13)));
-			
-			List<Record> source2 = new ArrayList<Record>();
-			source2.add(new Record(new LongValue(11)));
-			source2.add(new Record(new LongValue(17)));
-			
-			// test coGroup
-			cogrouper.coGroup(source1, source2, collector);
-			assertEquals(4, target.size());
-			assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
-			assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
-			assertEquals(new LongValue(11), target.get(2).getField(0, LongValue.class));
-			assertEquals(new LongValue(17), target.get(3).getField(0, LongValue.class));
-			target.clear();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testExtractSemantics() {
-		try {
-			{
-				CoGroupOperator coGroupOp = CoGroupOperator.builder(new TestCoGroupFunction(), LongValue.class, 1, 2).build();
-				
-				DualInputSemanticProperties props = coGroupOp.getSemanticProperties();
-				FieldSet fw2 = props.getForwardingTargetFields(0, 2);
-				FieldSet fw4 = props.getForwardingTargetFields(1, 4);
-				assertNotNull(fw2);
-				assertNotNull(fw4);
-				assertEquals(1, fw2.size());
-				assertEquals(1, fw4.size());
-				assertTrue(fw2.contains(2));
-				assertTrue(fw4.contains(4));
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@ConstantFieldsFirst(2)
-	@ConstantFieldsSecond(4)
-	public static class TestCoGroupFunction extends CoGroupFunction {
-		
-		private final AtomicInteger methodCounter;
-		
-		private TestCoGroupFunction(AtomicInteger methodCounter) {
-			this.methodCounter= methodCounter;
-		}
-		
-		public TestCoGroupFunction() {
-			methodCounter = new AtomicInteger();
-		}
-		
-		@Override
-		public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) throws Exception {
-			while (records1.hasNext()) {
-				out.collect(records1.next());
-			}
-			while (records2.hasNext()) {
-				out.collect(records2.next());
-			}
-		}
-		
-		@Override
-		public void close() throws Exception {
-			methodCounter.incrementAndGet();
-			super.close();
-		}
-		
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			methodCounter.incrementAndGet();
-			super.open(parameters);
-		}
-	};
-}