You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2017/07/25 08:19:06 UTC
[4/6] flink git commit: [FLINK-7181] Activate checkstyle
flink-java/operators/*
http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
index ee5ab2e..bac85ee 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
@@ -33,23 +33,27 @@ import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Preconditions;
import java.util.Arrays;
+/**
+ * An operation that allows storing data results.
+ * @param <T>
+ */
@Public
public class DataSink<T> {
-
+
private final OutputFormat<T> format;
-
+
private final TypeInformation<T> type;
-
+
private final DataSet<T> data;
-
+
private String name;
-
+
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
private ResourceSpec minResources = ResourceSpec.DEFAULT;
@@ -72,14 +76,12 @@ public class DataSink<T> {
if (data == null) {
throw new IllegalArgumentException("The data set must not be null.");
}
-
-
+
this.format = format;
this.data = data;
this.type = type;
}
-
@Internal
public OutputFormat<T> getFormat() {
return format;
@@ -96,7 +98,7 @@ public class DataSink<T> {
}
/**
- * Pass a configuration to the OutputFormat
+ * Pass a configuration to the OutputFormat.
* @param parameters Configuration parameters
*/
public DataSink<T> withParameters(Configuration parameters) {
@@ -106,9 +108,11 @@ public class DataSink<T> {
/**
* Sorts each local partition of a {@link org.apache.flink.api.java.tuple.Tuple} data set
- * on the specified field in the specified {@link Order} before it is emitted by the output format.<br>
- * <b>Note: Only tuple data sets can be sorted using integer field indices.</b><br>
- * The tuple data set can be sorted on multiple fields in different orders
+ * on the specified field in the specified {@link Order} before it is emitted by the output format.
+ *
+ * <p><b>Note: Only tuple data sets can be sorted using integer field indices.</b>
+ *
+ * <p>The tuple data set can be sorted on multiple fields in different orders
* by chaining {@link #sortLocalOutput(int, Order)} calls.
*
* @param field The Tuple field on which the data set is locally sorted.
@@ -132,7 +136,7 @@ public class DataSink<T> {
throw new InvalidProgramException("Selected sort key is not a sortable type");
}
- if(this.sortKeyPositions == null) {
+ if (this.sortKeyPositions == null) {
// set sorting info
this.sortKeyPositions = flatKeys;
this.sortOrders = new Order[flatKeys.length];
@@ -144,9 +148,9 @@ public class DataSink<T> {
this.sortKeyPositions = Arrays.copyOf(this.sortKeyPositions, newLength);
this.sortOrders = Arrays.copyOf(this.sortOrders, newLength);
- for(int i=0; i<flatKeys.length; i++) {
- this.sortKeyPositions[oldLength+i] = flatKeys[i];
- this.sortOrders[oldLength+i] = order;
+ for (int i = 0; i < flatKeys.length; i++) {
+ this.sortKeyPositions[oldLength + i] = flatKeys[i];
+ this.sortOrders[oldLength + i] = order;
}
}
@@ -155,10 +159,12 @@ public class DataSink<T> {
/**
* Sorts each local partition of a data set on the field(s) specified by the field expression
- * in the specified {@link Order} before it is emitted by the output format.<br>
- * <b>Note: Non-composite types can only be sorted on the full element which is specified by
- * a wildcard expression ("*" or "_").</b><br>
- * Data sets of composite types (Tuple or Pojo) can be sorted on multiple fields in different orders
+ * in the specified {@link Order} before it is emitted by the output format.
+ *
+ * <p><b>Note: Non-composite types can only be sorted on the full element which is specified by
+ * a wildcard expression ("*" or "_").</b>
+ *
+ * <p>Data sets of composite types (Tuple or Pojo) can be sorted on multiple fields in different orders
* by chaining {@link #sortLocalOutput(String, Order)} calls.
*
* @param fieldExpression The field expression for the field(s) on which the data set is locally sorted.
@@ -189,7 +195,7 @@ public class DataSink<T> {
orders = new Order[numFields];
Arrays.fill(orders, order);
- if(this.sortKeyPositions == null) {
+ if (this.sortKeyPositions == null) {
// set sorting info
this.sortKeyPositions = fields;
this.sortOrders = orders;
@@ -199,9 +205,9 @@ public class DataSink<T> {
int newLength = oldLength + numFields;
this.sortKeyPositions = Arrays.copyOf(this.sortKeyPositions, newLength);
this.sortOrders = Arrays.copyOf(this.sortOrders, newLength);
- for(int i=0; i<numFields; i++) {
- this.sortKeyPositions[oldLength+i] = fields[i];
- this.sortOrders[oldLength+i] = orders[i];
+ for (int i = 0; i < numFields; i++) {
+ this.sortKeyPositions[oldLength + i] = fields[i];
+ this.sortOrders[oldLength + i] = orders[i];
}
}
@@ -214,16 +220,16 @@ public class DataSink<T> {
public Configuration getParameters() {
return this.parameters;
}
-
+
// --------------------------------------------------------------------------------------------
-
+
public DataSink<T> name(String name) {
this.name = name;
return this;
}
-
+
// --------------------------------------------------------------------------------------------
-
+
protected GenericDataSinkBase<T> translateToDataFlow(Operator<T> input) {
// select the name (or create a default one)
String name = this.name != null ? this.name : this.format.toString();
@@ -231,11 +237,11 @@ public class DataSink<T> {
// set input
sink.setInput(input);
// set parameters
- if(this.parameters != null) {
+ if (this.parameters != null) {
sink.getParameters().addAll(this.parameters);
}
// set parallelism
- if(this.parallelism > 0) {
+ if (this.parallelism > 0) {
// use specified parallelism
sink.setParallelism(this.parallelism);
} else {
@@ -243,34 +249,34 @@ public class DataSink<T> {
sink.setParallelism(input.getParallelism());
}
- if(this.sortKeyPositions != null) {
+ if (this.sortKeyPositions != null) {
// configure output sorting
Ordering ordering = new Ordering();
- for(int i=0; i<this.sortKeyPositions.length; i++) {
+ for (int i = 0; i < this.sortKeyPositions.length; i++) {
ordering.appendOrdering(this.sortKeyPositions[i], null, this.sortOrders[i]);
}
sink.setLocalOrder(ordering);
}
-
+
return sink;
}
-
+
// --------------------------------------------------------------------------------------------
-
+
@Override
public String toString() {
return "DataSink '" + (this.name == null ? "<unnamed>" : this.name) + "' (" + this.format.toString() + ")";
}
-
+
/**
* Returns the parallelism of this data sink.
- *
+ *
* @return The parallelism of this data sink.
*/
public int getParallelism() {
return this.parallelism;
}
-
+
/**
* Sets the parallelism for this data sink.
* The degree must be 1 or more.
http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
index af6f65b..8ae1c7d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
@@ -34,7 +34,7 @@ import org.apache.flink.configuration.Configuration;
* An operation that creates a new data set (data source). The operation acts as the
* data set on which to apply further transformations. It encapsulates additional
* configuration parameters, to customize the execution.
- *
+ *
* @param <OUT> The type of the elements produced by this data source.
*/
@Public
@@ -49,25 +49,25 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
private SplitDataProperties<OUT> splitDataProperties;
// --------------------------------------------------------------------------------------------
-
+
/**
* Creates a new data source.
- *
+ *
* @param context The environment in which the data source gets executed.
* @param inputFormat The input format that the data source executes.
* @param type The type of the elements produced by this input format.
*/
public DataSource(ExecutionEnvironment context, InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> type, String dataSourceLocationName) {
super(context, type);
-
+
this.dataSourceLocationName = dataSourceLocationName;
-
+
if (inputFormat == null) {
throw new IllegalArgumentException("The input format may not be null.");
}
-
+
this.inputFormat = inputFormat;
-
+
if (inputFormat instanceof NonParallelInput) {
this.parallelism = 1;
}
@@ -75,23 +75,23 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
/**
* Gets the input format that is executed by this data source.
- *
+ *
* @return The input format that is executed by this data source.
*/
@Internal
public InputFormat<OUT, ?> getInputFormat() {
return this.inputFormat;
}
-
+
/**
- * Pass a configuration to the InputFormat
+ * Pass a configuration to the InputFormat.
* @param parameters Configuration parameters
*/
public DataSource<OUT> withParameters(Configuration parameters) {
this.parameters = parameters;
return this;
}
-
+
/**
* @return Configuration for the InputFormat.
*/
@@ -99,15 +99,15 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
return this.parameters;
}
-
/**
* Returns the {@link org.apache.flink.api.java.io.SplitDataProperties} for the
* {@link org.apache.flink.core.io.InputSplit}s of this DataSource
* for configurations.
*
- * SplitDataProperties can help to generate more efficient execution plans.
- * <br>
- * <b>
+ * <p>SplitDataProperties can help to generate more efficient execution plans.
+ *
+ *
+ * <p><b>
* IMPORTANT: Incorrect configuration of SplitDataProperties can cause wrong results!
* </b>
*
@@ -115,28 +115,28 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
*/
@PublicEvolving
public SplitDataProperties<OUT> getSplitDataProperties() {
- if(this.splitDataProperties == null) {
+ if (this.splitDataProperties == null) {
this.splitDataProperties = new SplitDataProperties<OUT>(this);
}
return this.splitDataProperties;
}
// --------------------------------------------------------------------------------------------
-
+
protected GenericDataSourceBase<OUT, ?> translateToDataFlow() {
- String name = this.name != null ? this.name : "at "+dataSourceLocationName+" ("+inputFormat.getClass().getName()+")";
+ String name = this.name != null ? this.name : "at " + dataSourceLocationName + " (" + inputFormat.getClass().getName() + ")";
if (name.length() > 150) {
name = name.substring(0, 150);
}
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
GenericDataSourceBase<OUT, ?> source = new GenericDataSourceBase(this.inputFormat,
- new OperatorInformation<OUT>(getType()), name);
+ new OperatorInformation<OUT>(getType()), name);
source.setParallelism(parallelism);
- if(this.parameters != null) {
+ if (this.parameters != null) {
source.getParameters().addAll(this.parameters);
}
- if(this.splitDataProperties != null) {
+ if (this.splitDataProperties != null) {
source.setSplitDataProperties(this.splitDataProperties);
}
return source;
http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
index 61f83b1..dc80e70 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
@@ -18,10 +18,8 @@
package org.apache.flink.api.java.operators;
-import java.util.Arrays;
-
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.aggregators.Aggregator;
@@ -35,10 +33,12 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.types.Value;
import org.apache.flink.util.Preconditions;
+import java.util.Arrays;
+
/**
* The DeltaIteration represents the start of a delta iteration. It is created from the DataSet that
* represents the initial solution set via the {@link DataSet#iterateDelta(DataSet, int, int...)} method.
- *
+ *
* @param <ST> The data type of the solution set.
* @param <WT> The data type of the workset (the feedback data set).
*
@@ -47,27 +47,27 @@ import org.apache.flink.util.Preconditions;
*/
@Public
public class DeltaIteration<ST, WT> {
-
+
private final AggregatorRegistry aggregators = new AggregatorRegistry();
-
+
private final DataSet<ST> initialSolutionSet;
private final DataSet<WT> initialWorkset;
-
+
private final SolutionSetPlaceHolder<ST> solutionSetPlaceholder;
private final WorksetPlaceHolder<WT> worksetPlaceholder;
private final Keys<ST> keys;
-
+
private final int maxIterations;
-
+
private String name;
-
+
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
private ResourceSpec minResources = ResourceSpec.DEFAULT;
private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
-
+
private boolean solutionSetUnManaged;
public DeltaIteration(ExecutionEnvironment context, TypeInformation<ST> type, DataSet<ST> solutionSet, DataSet<WT> workset, Keys<ST> keys, int maxIterations) {
@@ -80,15 +80,15 @@ public class DeltaIteration<ST, WT> {
}
// --------------------------------------------------------------------------------------------
-
+
/**
* Closes the delta iteration. This method defines the end of the delta iteration's function.
- *
+ *
* @param solutionSetDelta The delta for the solution set. The delta will be merged into the solution set at the end of
* each iteration.
* @param newWorkset The new workset (feedback data set) that will be fed back to the next iteration.
* @return The DataSet that represents the result of the iteration, after the computation has terminated.
- *
+ *
* @see DataSet#iterateDelta(DataSet, int, int...)
*/
public DataSet<ST> closeWith(DataSet<ST> solutionSetDelta, DataSet<WT> newWorkset) {
@@ -98,18 +98,18 @@ public class DeltaIteration<ST, WT> {
/**
* Gets the initial solution set. This is the data set on which the delta iteration was started.
- * <p>
- * Consider the following example:
+ *
+ * <p>Consider the following example:
* <pre>
* {@code
* DataSet<MyType> solutionSetData = ...;
* DataSet<AnotherType> worksetData = ...;
- *
+ *
* DeltaIteration<MyType, AnotherType> iteration = solutionSetData.iteratorDelta(worksetData, 10, ...);
* }
* </pre>
* The <tt>solutionSetData</tt> would be the data set returned by {@code iteration.getInitialSolutionSet();}.
- *
+ *
* @return The data set that forms the initial solution set.
*/
public DataSet<ST> getInitialSolutionSet() {
@@ -119,18 +119,18 @@ public class DeltaIteration<ST, WT> {
/**
* Gets the initial workset. This is the data set passed to the method that starts the delta
* iteration.
- * <p>
- * Consider the following example:
+ *
+ * <p>Consider the following example:
* <pre>
* {@code
* DataSet<MyType> solutionSetData = ...;
* DataSet<AnotherType> worksetData = ...;
- *
+ *
* DeltaIteration<MyType, AnotherType> iteration = solutionSetData.iteratorDelta(worksetData, 10, ...);
* }
* </pre>
* The <tt>worksetData</tt> would be the data set returned by {@code iteration.getInitialWorkset();}.
- *
+ *
* @return The data set that forms the initial workset.
*/
public DataSet<WT> getInitialWorkset() {
@@ -139,7 +139,7 @@ public class DeltaIteration<ST, WT> {
/**
* Gets the solution set of the delta iteration. The solution set represents the state that is kept across iterations.
- *
+ *
* @return The solution set of the delta iteration.
*/
public SolutionSetPlaceHolder<ST> getSolutionSet() {
@@ -157,7 +157,7 @@ public class DeltaIteration<ST, WT> {
/**
* Sets the name for the iteration. The name is displayed in logs and messages.
- *
+ *
* @param name The name for the iteration.
* @return The iteration object, for function call chaining.
*/
@@ -165,16 +165,16 @@ public class DeltaIteration<ST, WT> {
this.name = name;
return this;
}
-
+
/**
* Gets the name from this iteration.
- *
+ *
* @return The name of the iteration.
*/
public String getName() {
return name;
}
-
+
/**
* Sets the parallelism for the iteration.
*
@@ -187,10 +187,10 @@ public class DeltaIteration<ST, WT> {
this.parallelism = parallelism;
return this;
}
-
+
/**
* Gets the iteration's parallelism.
- *
+ *
* @return The iteration's parallelism, or {@link ExecutionConfig#PARALLELISM_DEFAULT} if not set.
*/
public int getParallelism() {
@@ -266,13 +266,13 @@ public class DeltaIteration<ST, WT> {
* iteration, such as number of elements processed. The aggregators compute global aggregates: After each iteration step,
* the values are globally aggregated to produce one aggregate that represents statistics across all parallel instances.
* The value of an aggregator can be accessed in the next iteration.
- * <p>
- * Aggregators can be accessed inside a function via the
+ *
+ * <p>Aggregators can be accessed inside a function via the
* {@link org.apache.flink.api.common.functions.AbstractRichFunction#getIterationRuntimeContext()} method.
- *
+ *
* @param name The name under which the aggregator is registered.
* @param aggregator The aggregator class.
- *
+ *
* @return The DeltaIteration itself, to allow chaining function calls.
*/
@PublicEvolving
@@ -297,62 +297,61 @@ public class DeltaIteration<ST, WT> {
*/
@PublicEvolving
public <X extends Value> DeltaIteration<ST, WT> registerAggregationConvergenceCriterion(
- String name, Aggregator<X> aggregator, ConvergenceCriterion<X> convergenceCheck)
- {
+ String name, Aggregator<X> aggregator, ConvergenceCriterion<X> convergenceCheck) {
this.aggregators.registerAggregationConvergenceCriterion(name, aggregator, convergenceCheck);
return this;
}
-
+
/**
* Gets the registry for aggregators for the iteration.
- *
+ *
* @return The registry with all aggregators.
*/
@PublicEvolving
public AggregatorRegistry getAggregators() {
return this.aggregators;
}
-
+
/**
* Sets whether to keep the solution set in managed memory (safe against heap exhaustion) or unmanaged memory
* (objects on heap).
- *
+ *
* @param solutionSetUnManaged True to keep the solution set in unmanaged memory, false to keep it in managed memory.
- *
+ *
* @see #isSolutionSetUnManaged()
*/
public void setSolutionSetUnManaged(boolean solutionSetUnManaged) {
this.solutionSetUnManaged = solutionSetUnManaged;
}
-
+
/**
* gets whether the solution set is in managed or unmanaged memory.
- *
+ *
* @return True, if the solution set is in unmanaged memory (object heap), false if in managed memory.
- *
+ *
* @see #setSolutionSetUnManaged(boolean)
*/
public boolean isSolutionSetUnManaged() {
return solutionSetUnManaged;
}
-
+
// --------------------------------------------------------------------------------------------
-
+
/**
* A {@link DataSet} that acts as a placeholder for the solution set during the iteration.
- *
+ *
* @param <ST> The type of the elements in the solution set.
*/
@Public
public static class SolutionSetPlaceHolder<ST> extends DataSet<ST>{
-
+
private final DeltaIteration<ST, ?> deltaIteration;
-
+
private SolutionSetPlaceHolder(ExecutionEnvironment context, TypeInformation<ST> type, DeltaIteration<ST, ?> deltaIteration) {
super(context, type);
this.deltaIteration = deltaIteration;
}
-
+
public void checkJoinKeyFields(int[] keyFields) {
int[] ssKeys = deltaIteration.keys.computeLogicalKeyPositions();
if (!Arrays.equals(ssKeys, keyFields)) {
http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java
index e593488..de93dbb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.api.java.operators;
+
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.operators.Keys;
@@ -24,19 +25,24 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+/**
+ * Resulting {@link DataSet} of a delta iteration operation.
+ * @param <ST>
+ * @param <WT>
+ */
@Public
public class DeltaIterationResultSet<ST, WT> extends DataSet<ST> {
private DeltaIteration<ST, WT> iterationHead;
private DataSet<ST> nextSolutionSet;
-
+
private DataSet<WT> nextWorkset;
-
+
private Keys<ST> keys;
-
+
private int maxIterations;
-
+
private TypeInformation<WT> typeWS;
DeltaIterationResultSet(ExecutionEnvironment context,
@@ -46,8 +52,7 @@ public class DeltaIterationResultSet<ST, WT> extends DataSet<ST> {
DataSet<ST> nextSolutionSet,
DataSet<WT> nextWorkset,
Keys<ST> keys,
- int maxIterations)
- {
+ int maxIterations) {
super(context, typeSS);
this.iterationHead = iterationHead;
this.nextWorkset = nextWorkset;
@@ -60,7 +65,7 @@ public class DeltaIterationResultSet<ST, WT> extends DataSet<ST> {
public DeltaIteration<ST, WT> getIterationHead() {
return iterationHead;
}
-
+
public DataSet<ST> getNextSolutionSet() {
return nextSolutionSet;
}
@@ -78,7 +83,7 @@ public class DeltaIterationResultSet<ST, WT> extends DataSet<ST> {
public int getMaxIterations() {
return maxIterations;
}
-
+
public TypeInformation<WT> getWorksetType() {
return typeWS;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index 267513d..9dc2a9c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -111,7 +111,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
/**
* Sets the strategy to use for the combine phase of the reduce.
*
- * If this method is not called, then the default hint will be used.
+ * <p>If this method is not called, then the default hint will be used.
* ({@link org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint#OPTIMIZER_CHOOSES})
*
* @param strategy The hint to use.
@@ -132,8 +132,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
String name,
Operator<IN> input,
int parallelism,
- CombineHint hint)
- {
+ CombineHint hint) {
@SuppressWarnings("unchecked")
final SelectorFunctionKeys<IN, K> keys = (SelectorFunctionKeys<IN, K>) rawKeys;
@@ -150,7 +149,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
}
@Internal
- public static final class DistinctFunction<T> implements ReduceFunction<T> {
+ private static final class DistinctFunction<T> implements ReduceFunction<T> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
index ae8b5ea..901274e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
@@ -22,45 +22,45 @@ import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.java.operators.translation.PlanFilterOperator;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.translation.PlanFilterOperator;
/**
* This operator represents the application of a "filter" function on a data set, and the
* result data set produced by the function.
- *
+ *
* @param <T> The type of the data set filtered by the operator.
*/
@Public
public class FilterOperator<T> extends SingleInputUdfOperator<T, T, FilterOperator<T>> {
-
+
protected final FilterFunction<T> function;
-
+
protected final String defaultName;
public FilterOperator(DataSet<T> input, FilterFunction<T> function, String defaultName) {
super(input, input.getType());
-
+
this.function = function;
this.defaultName = defaultName;
UdfOperatorUtils.analyzeSingleInputUdf(this, FilterFunction.class, defaultName, function, null);
}
-
+
@Override
protected FilterFunction<T> getFunction() {
return function;
}
-
+
@Override
- protected org.apache.flink.api.common.operators.base.FilterOperatorBase<T, FlatMapFunction<T,T>> translateToDataFlow(Operator<T> input) {
-
- String name = getName() != null ? getName() : "Filter at "+defaultName;
-
+ protected org.apache.flink.api.common.operators.base.FilterOperatorBase<T, FlatMapFunction<T, T>> translateToDataFlow(Operator<T> input) {
+
+ String name = getName() != null ? getName() : "Filter at " + defaultName;
+
// create operator
PlanFilterOperator<T> po = new PlanFilterOperator<T>(function, name, getInputType());
po.setInput(input);
-
+
// set parallelism
if (getParallelism() > 0) {
// use specified parallelism
@@ -69,7 +69,7 @@ public class FilterOperator<T> extends SingleInputUdfOperator<T, T, FilterOperat
// if no parallelism has been specified, use parallelism of input operator to enable chaining
po.setParallelism(input.getParallelism());
}
-
+
return po;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
index ed730ae..56d7cb8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
@@ -29,48 +29,48 @@ import org.apache.flink.api.java.DataSet;
/**
* This operator represents the application of a "flatMap" function on a data set, and the
* result data set produced by the function.
- *
+ *
* @param <IN> The type of the data set consumed by the operator.
* @param <OUT> The type of the data set created by the operator.
*/
@Public
public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, FlatMapOperator<IN, OUT>> {
-
+
protected final FlatMapFunction<IN, OUT> function;
-
+
protected final String defaultName;
-
+
public FlatMapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatMapFunction<IN, OUT> function, String defaultName) {
super(input, resultType);
-
+
this.function = function;
this.defaultName = defaultName;
UdfOperatorUtils.analyzeSingleInputUdf(this, FlatMapFunction.class, defaultName, function, null);
}
-
+
@Override
protected FlatMapFunction<IN, OUT> getFunction() {
return function;
}
@Override
- protected FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN,OUT>> translateToDataFlow(Operator<IN> input) {
- String name = getName() != null ? getName() : "FlatMap at "+defaultName;
+ protected FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
+ String name = getName() != null ? getName() : "FlatMap at " + defaultName;
// create operator
FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> po = new FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>>(function,
- new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
+ new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
// set input
po.setInput(input);
// set parallelism
- if(this.getParallelism() > 0) {
+ if (this.getParallelism() > 0) {
// use specified parallelism
po.setParallelism(this.getParallelism());
} else {
// if no parallelism has been specified, use parallelism of input operator to enable chaining
po.setParallelism(input.getParallelism());
}
-
+
return po;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
index 0c8e657..e4ed07f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
@@ -33,13 +34,12 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.SemanticPropUtil;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingGroupCombineOperator;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedGroupCombineOperator;
-import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
/**
* This operator behaves like the GroupReduceOperator with Combine but only runs the Combine part which reduces all data
- * locally in their partitions. The combine part can return an arbitrary data type. This is useful to pre-combine values
+ * locally in their partitions. The combine part can return an arbitrary data type. This is useful to pre-combine values
* into an intermediate representation before applying a proper reduce operation.
*
* @param <IN> The type of the data set consumed by the operator.
@@ -95,12 +95,12 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
SingleInputSemanticProperties props = super.getSemanticProperties();
// offset semantic information by extracted key fields
- if(props != null &&
+ if (props != null &&
this.grouper != null &&
this.grouper.keys instanceof SelectorFunctionKeys) {
- int offset = ((SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
- if(this.grouper instanceof SortedGrouping) {
+ int offset = ((SelectorFunctionKeys<?, ?>) this.grouper.keys).getKeyType().getTotalFields();
+ if (this.grouper instanceof SortedGrouping) {
offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
}
@@ -174,7 +174,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
Order[] sortOrders = sortedGrouper.getGroupSortOrders();
Ordering o = new Ordering();
- for(int i=0; i < sortKeyPositions.length; i++) {
+ for (int i = 0; i < sortKeyPositions.length; i++) {
o.appendOrdering(sortKeyPositions[i], null, sortOrders[i]);
}
po.setGroupOrder(o);
@@ -187,7 +187,6 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
}
}
-
// --------------------------------------------------------------------------------------------
@SuppressWarnings("unchecked")
@@ -196,8 +195,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
GroupCombineFunction<IN, OUT> function,
TypeInformation<OUT> outputType,
String name,
- Operator<IN> input)
- {
+ Operator<IN> input) {
final SelectorFunctionKeys<IN, K> keys = (SelectorFunctionKeys<IN, K>) rawKeys;
TypeInformation<Tuple2<K, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(keys);
@@ -218,10 +216,9 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
GroupCombineFunction<IN, OUT> function,
TypeInformation<OUT> outputType,
String name,
- Operator<IN> input)
- {
+ Operator<IN> input) {
final SelectorFunctionKeys<IN, K1> groupingKey = (SelectorFunctionKeys<IN, K1>) rawGroupingKey;
- final SelectorFunctionKeys<IN, K2> sortingKey = (SelectorFunctionKeys<IN, K2>)rawSortingKeys;
+ final SelectorFunctionKeys<IN, K2> sortingKey = (SelectorFunctionKeys<IN, K2>) rawSortingKeys;
TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(groupingKey, sortingKey);
Operator<Tuple3<K1, K2, IN>> inputWithKey = KeyFunctions.appendKeyExtractor(input, groupingKey, sortingKey);
http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index b339baf..069ac44 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -24,7 +24,8 @@ import org.apache.flink.api.common.functions.CombineFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.operators.translation.CombineToGroupCombineWrapper;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
@@ -32,15 +33,15 @@ import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
-import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.java.operators.translation.CombineToGroupCombineWrapper;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedReduceGroupOperator;
import org.apache.flink.api.java.operators.translation.RichCombineToGroupCombineWrapper;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.DataSet;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +51,7 @@ import java.lang.reflect.Type;
/**
* This operator represents the application of a "reduceGroup" function on a data set, and the
* result data set produced by the function.
- *
+ *
* @param <IN> The type of the data set consumed by the operator.
* @param <OUT> The type of the data set created by the operator.
*/
@@ -62,14 +63,14 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
private GroupReduceFunction<IN, OUT> function;
private final Grouping<IN> grouper;
-
+
private final String defaultName;
private boolean combinable;
/**
* Constructor for a non-grouped reduce (all reduce).
- *
+ *
* @param input The input data set to the groupReduce function.
* @param function The user-defined GroupReduce function.
*/
@@ -82,10 +83,10 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
this.combinable = checkCombinability();
}
-
+
/**
* Constructor for a grouped reduce.
- *
+ *
* @param input The grouped input to be processed group-wise by the groupReduce function.
* @param function The user-defined GroupReduce function.
*/
@@ -149,14 +150,12 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
}
return false;
}
-
-
+
@Override
protected GroupReduceFunction<IN, OUT> getFunction() {
return function;
}
-
// --------------------------------------------------------------------------------------------
// Properties
// --------------------------------------------------------------------------------------------
@@ -164,10 +163,10 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
public boolean isCombinable() {
return combinable;
}
-
+
public GroupReduceOperator<IN, OUT> setCombinable(boolean combinable) {
- if(combinable) {
+ if (combinable) {
// sanity check that the function is a subclass of the combine interface
if (!checkCombinability()) {
throw new IllegalArgumentException("Either the function does not implement a combine interface, " +
@@ -188,12 +187,12 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
SingleInputSemanticProperties props = super.getSemanticProperties();
// offset semantic information by extracted key fields
- if(props != null &&
+ if (props != null &&
this.grouper != null &&
this.grouper.keys instanceof SelectorFunctionKeys) {
- int offset = ((SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
- if(this.grouper instanceof SortedGrouping) {
+ int offset = ((SelectorFunctionKeys<?, ?>) this.grouper.keys).getKeyType().getTotalFields();
+ if (this.grouper instanceof SortedGrouping) {
offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
}
props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
@@ -205,7 +204,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
// --------------------------------------------------------------------------------------------
// Translation
// --------------------------------------------------------------------------------------------
-
+
@Override
@SuppressWarnings("unchecked")
protected GroupReduceOperatorBase<?, OUT, ?> translateToDataFlow(Operator<IN> input) {
@@ -225,16 +224,16 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType());
GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>> po =
new GroupReduceOperatorBase<>(function, operatorInfo, new int[0], name);
-
+
po.setCombinable(combinable);
po.setInput(input);
// the parallelism for a non grouped reduce can only be 1
po.setParallelism(1);
return po;
}
-
+
if (grouper.getKeys() instanceof SelectorFunctionKeys) {
-
+
@SuppressWarnings("unchecked")
SelectorFunctionKeys<IN, ?> selectorKeys = (SelectorFunctionKeys<IN, ?>) grouper.getKeys();
@@ -271,29 +270,28 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
po.setInput(input);
po.setParallelism(getParallelism());
po.setCustomPartitioner(grouper.getCustomPartitioner());
-
+
// set group order
if (grouper instanceof SortedGrouping) {
SortedGrouping<IN> sortedGrouper = (SortedGrouping<IN>) grouper;
int[] sortKeyPositions = sortedGrouper.getGroupSortKeyPositions();
Order[] sortOrders = sortedGrouper.getGroupSortOrders();
-
+
Ordering o = new Ordering();
- for(int i=0; i < sortKeyPositions.length; i++) {
+ for (int i = 0; i < sortKeyPositions.length; i++) {
o.appendOrdering(sortKeyPositions[i], null, sortOrders[i]);
}
po.setGroupOrder(o);
}
-
+
return po;
}
else {
throw new UnsupportedOperationException("Unrecognized key type.");
}
}
-
-
+
// --------------------------------------------------------------------------------------------
@SuppressWarnings("unchecked")
@@ -303,8 +301,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
TypeInformation<OUT> outputType,
String name,
Operator<IN> input,
- boolean combinable)
- {
+ boolean combinable) {
SelectorFunctionKeys<IN, K> keys = (SelectorFunctionKeys<IN, K>) rawKeys;
TypeInformation<Tuple2<K, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(keys);
@@ -326,11 +323,10 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
TypeInformation<OUT> outputType,
String name,
Operator<IN> input,
- boolean combinable)
- {
+ boolean combinable) {
final SelectorFunctionKeys<IN, K1> groupingKey = (SelectorFunctionKeys<IN, K1>) rawGroupingKey;
final SelectorFunctionKeys<IN, K2> sortingKey = (SelectorFunctionKeys<IN, K2>) rawSortingKey;
- TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(groupingKey,sortingKey);
+ TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(groupingKey, sortingKey);
Operator<Tuple3<K1, K2, IN>> inputWithKey = KeyFunctions.appendKeyExtractor(input, groupingKey, sortingKey);
http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
index dbaaa9d..74bd9e5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
@@ -26,8 +26,9 @@ import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.DataSet;
/**
- * Grouping is an intermediate step for a transformation on a grouped DataSet.<br>
- * The following transformation can be applied on Grouping:
+ * Grouping is an intermediate step for a transformation on a grouped DataSet.
+ *
+ * <p>The following transformation can be applied on Grouping:
* <ul>
* <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)},</li>
* <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)}, and</li>
@@ -35,24 +36,23 @@ import org.apache.flink.api.java.DataSet;
* </ul>
*
* @param <T> The type of the elements of the grouped DataSet.
- *
+ *
* @see DataSet
*/
@Public
public abstract class Grouping<T> {
-
+
protected final DataSet<T> inputDataSet;
-
+
protected final Keys<T> keys;
-
+
protected Partitioner<?> customPartitioner;
-
public Grouping(DataSet<T> set, Keys<T> keys) {
if (set == null || keys == null) {
throw new NullPointerException();
}
-
+
if (keys.isEmpty()) {
throw new InvalidProgramException("The grouping keys must not be empty.");
}
@@ -60,17 +60,17 @@ public abstract class Grouping<T> {
this.inputDataSet = set;
this.keys = keys;
}
-
+
/**
* Returns the input DataSet of a grouping operation, that is the one before the grouping. This means that
* if it is applied directly to the result of a grouping operation, it will cancel its effect. As an example, in the
* following snippet:
- * <pre><code>
+ * <pre>{@code
* DataSet<X> notGrouped = input.groupBy().getDataSet();
* DataSet<Y> allReduced = notGrouped.reduce()
- * </pre></code>
- * the <code>groupBy()</code> is as if it never happened, as the <code>notGrouped</code> DataSet corresponds
- * to the input of the <code>groupBy()</code> (because of the <code>getDataset()</code>).
+ * }</pre>
+ * the {@code groupBy()} is as if it never happened, as the {@code notGrouped} DataSet corresponds
+ * to the input of the {@code groupBy()} (because of the {@code getDataset()}).
* */
@Internal
public DataSet<T> getInputDataSet() {
@@ -81,11 +81,11 @@ public abstract class Grouping<T> {
public Keys<T> getKeys() {
return this.keys;
}
-
+
/**
* Gets the custom partitioner to be used for this grouping, or {@code null}, if
* none was defined.
- *
+ *
* @return The custom partitioner to be used for this grouping.
*/
@Internal
http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java
index c7ff6ab..d522f02 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java
@@ -31,9 +31,9 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.types.Value;
/**
- * The IterativeDataSet represents the start of an iteration. It is created from the DataSet that
+ * The IterativeDataSet represents the start of an iteration. It is created from the DataSet that
* represents the initial solution set via the {@link DataSet#iterate(int)} method.
- *
+ *
* @param <T> The data type of set that is the input and feedback of the iteration.
*
* @see DataSet#iterate(int)
@@ -42,33 +42,33 @@ import org.apache.flink.types.Value;
public class IterativeDataSet<T> extends SingleInputOperator<T, T, IterativeDataSet<T>> {
private final AggregatorRegistry aggregators = new AggregatorRegistry();
-
+
private int maxIterations;
public IterativeDataSet(ExecutionEnvironment context, TypeInformation<T> type, DataSet<T> input, int maxIterations) {
super(input, type);
this.maxIterations = maxIterations;
}
-
+
// --------------------------------------------------------------------------------------------
-
+
/**
* Closes the iteration. This method defines the end of the iterative program part.
- *
+ *
* @param iterationResult The data set that will be fed back to the next iteration.
* @return The DataSet that represents the result of the iteration, after the computation has terminated.
- *
+ *
* @see DataSet#iterate(int)
*/
public DataSet<T> closeWith(DataSet<T> iterationResult) {
return new BulkIterationResultSet<T>(getExecutionEnvironment(), getType(), this, iterationResult);
}
-
+
/**
* Closes the iteration and specifies a termination criterion. This method defines the end of
* the iterative program part.
- * <p>
- * The termination criterion is a means of dynamically signaling the iteration to halt. It is expressed via a data
+ *
+ * <p>The termination criterion is a means of dynamically signaling the iteration to halt. It is expressed via a data
* set that will trigger to halt the loop as soon as the data set is empty. A typical way of using the termination
* criterion is to have a filter that filters out all elements that are considered non-converged. As soon as no more
* such elements exist, the iteration finishes.
@@ -76,7 +76,7 @@ public class IterativeDataSet<T> extends SingleInputOperator<T, T, IterativeData
* @param iterationResult The data set that will be fed back to the next iteration.
* @param terminationCriterion The data set that being used to trigger halt on operation once it is empty.
* @return The DataSet that represents the result of the iteration, after the computation has terminated.
- *
+ *
* @see DataSet#iterate(int)
*/
public DataSet<T> closeWith(DataSet<T> iterationResult, DataSet<?> terminationCriterion) {
@@ -85,25 +85,25 @@ public class IterativeDataSet<T> extends SingleInputOperator<T, T, IterativeData
/**
* Gets the maximum number of iterations.
- *
+ *
* @return The maximum number of iterations.
*/
public int getMaxIterations() {
return maxIterations;
}
-
+
/**
* Registers an {@link Aggregator} for the iteration. Aggregators can be used to maintain simple statistics during the
* iteration, such as number of elements processed. The aggregators compute global aggregates: After each iteration step,
* the values are globally aggregated to produce one aggregate that represents statistics across all parallel instances.
* The value of an aggregator can be accessed in the next iteration.
- * <p>
- * Aggregators can be accessed inside a function via the
+ *
+ * <p>Aggregators can be accessed inside a function via the
* {@link org.apache.flink.api.common.functions.AbstractRichFunction#getIterationRuntimeContext()} method.
- *
+ *
* @param name The name under which the aggregator is registered.
* @param aggregator The aggregator class.
- *
+ *
* @return The IterativeDataSet itself, to allow chaining function calls.
*/
@PublicEvolving
@@ -111,7 +111,7 @@ public class IterativeDataSet<T> extends SingleInputOperator<T, T, IterativeData
this.aggregators.registerAggregator(name, aggregator);
return this;
}
-
+
/**
* Registers an {@link Aggregator} for the iteration together with a {@link ConvergenceCriterion}. For a general description
* of aggregators, see {@link #registerAggregator(String, Aggregator)} and {@link Aggregator}.
@@ -119,33 +119,32 @@ public class IterativeDataSet<T> extends SingleInputOperator<T, T, IterativeData
* the iteration should terminate. A typical use case is to have an aggregator that sums up the total error of change
* in an iteration step and have to have a convergence criterion that signals termination as soon as the aggregate value
* is below a certain threshold.
- *
+ *
* @param name The name under which the aggregator is registered.
* @param aggregator The aggregator class.
* @param convergenceCheck The convergence criterion.
- *
+ *
* @return The IterativeDataSet itself, to allow chaining function calls.
*/
@PublicEvolving
public <X extends Value> IterativeDataSet<T> registerAggregationConvergenceCriterion(
- String name, Aggregator<X> aggregator, ConvergenceCriterion<X> convergenceCheck)
- {
+ String name, Aggregator<X> aggregator, ConvergenceCriterion<X> convergenceCheck) {
this.aggregators.registerAggregationConvergenceCriterion(name, aggregator, convergenceCheck);
return this;
}
-
+
/**
- * Gets the registry for aggregators. On the registry, one can add {@link Aggregator}s and an aggregator-based
+ * Gets the registry for aggregators. On the registry, one can add {@link Aggregator}s and an aggregator-based
* {@link ConvergenceCriterion}. This method offers an alternative way to registering the aggregators via
- * {@link #registerAggregator(String, Aggregator)} and {@link #registerAggregationConvergenceCriterion(String, Aggregator, ConvergenceCriterion))}.
- *
+ * {@link #registerAggregator(String, Aggregator)} and {@link #registerAggregationConvergenceCriterion(String, Aggregator, ConvergenceCriterion)}.
+ *
* @return The registry for aggregators.
*/
@PublicEvolving
public AggregatorRegistry getAggregators() {
return aggregators;
}
-
+
// --------------------------------------------------------------------------------------------
@Override