You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/11/20 12:19:57 UTC
incubator-flink git commit: Minor code clean up
Repository: incubator-flink
Updated Branches:
refs/heads/master 5c3dceb9c -> 4203bf99d
Minor code clean up
This closes #221.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4203bf99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4203bf99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4203bf99
Branch: refs/heads/master
Commit: 4203bf99ddd009b72681ba0dc554cabd47e1c85c
Parents: 5c3dceb
Author: Suneel Marthi <su...@gmail.com>
Authored: Wed Nov 19 18:22:09 2014 -0500
Committer: uce <uc...@apache.org>
Committed: Thu Nov 20 12:17:59 2014 +0100
----------------------------------------------------------------------
.../flink/client/CliFrontendTestUtils.java | 6 ++---
.../flink/configuration/Configuration.java | 14 ++++++------
.../configuration/GlobalConfiguration.java | 4 ++--
.../org/apache/flink/core/fs/FileSystem.java | 24 +++++---------------
.../java/org/apache/flink/core/fs/Path.java | 4 ++--
.../flink/core/fs/local/LocalFileSystem.java | 6 ++---
.../clustering/util/KMeansDataGenerator.java | 2 +-
.../iterative/io/SerializedUpdateBuffer.java | 20 +++++++++-------
.../iterative/task/SyncEventHandler.java | 8 +------
.../runtime/jobgraph/InputFormatVertex.java | 2 +-
.../apache/flink/runtime/jobgraph/JobGraph.java | 7 +++---
.../scala/operators/ScalaAggregateOperator.java | 20 ++++++----------
12 files changed, 48 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
index 9d0d526..9d4c6ae 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
@@ -19,19 +19,17 @@
package org.apache.flink.client;
-import static org.junit.Assert.fail;
-
import java.io.File;
import java.io.FileNotFoundException;
import java.io.PrintStream;
import java.lang.reflect.Field;
import java.net.MalformedURLException;
import java.util.Map;
-
-import org.apache.flink.client.CliFrontend;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
+import static org.junit.Assert.fail;
+
public class CliFrontendTestUtils {
public static final String TEST_JAR_MAIN_CLASS = "org.apache.flink.client.testjar.WordCount";
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index e798609..fa9378a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -147,11 +147,11 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
return (Integer) o;
}
else if (o.getClass() == Long.class) {
- long value = ((Long) o).longValue();
+ long value = (Long) o;
if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
return (int) value;
} else {
- LOG.warn("Configuation value " + value + " overflows/underflows the integer type.");
+ LOG.warn("Configuration value {} overflows/underflows the integer type.", value);
return defaultValue;
}
}
@@ -160,7 +160,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
return Integer.parseInt(o.toString());
}
catch (NumberFormatException e) {
- LOG.warn("Configuration cannot evaluate value " + o + " as an integer number");
+ LOG.warn("Configuration cannot evaluate value {} as an integer number", o);
return defaultValue;
}
}
@@ -280,7 +280,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
if (value <= Float.MAX_VALUE && value >= Float.MIN_VALUE) {
return (float) value;
} else {
- LOG.warn("Configuation value " + value + " overflows/underflows the float type.");
+ LOG.warn("Configuration value {} overflows/underflows the float type.", value);
return defaultValue;
}
}
@@ -289,7 +289,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
return Float.parseFloat(o.toString());
}
catch (NumberFormatException e) {
- LOG.warn("Configuration cannot evaluate value " + o + " as a float value");
+ LOG.warn("Configuration cannot evaluate value {} as a float value", o);
return defaultValue;
}
}
@@ -333,7 +333,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
return Double.parseDouble(o.toString());
}
catch (NumberFormatException e) {
- LOG.warn("Configuration cannot evaluate value " + o + " as a double value");
+ LOG.warn("Configuration cannot evaluate value {} as a double value", o);
return defaultValue;
}
}
@@ -370,7 +370,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
return (byte[]) o;
}
else {
- LOG.warn("Configuration cannot evaluate value " + o + " as a byte[] value");
+ LOG.warn("Configuration cannot evaluate value {} as a byte[] value", o);
return defaultValue;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index e57aa6c..7d40dbc 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -288,8 +288,8 @@ public final class GlobalConfiguration {
try {
final DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
- Document doc = null;
- Element root = null;
+ Document doc;
+ Element root;
doc = builder.parse(file);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 91780fd..9483290 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -111,21 +111,10 @@ public abstract class FileSystem {
}
if ((this.authority == null) || (key.authority == null)) {
-
- if (this.authority == null && key.authority == null) {
- return true;
- }
-
- return false;
+ return this.authority == null && key.authority == null;
}
-
- if (!this.authority.equals(key.authority)) {
- return false;
- }
-
- return true;
+ return this.authority.equals(key.authority);
}
-
return false;
}
@@ -234,7 +223,7 @@ public abstract class FileSystem {
+ ", referenced in file URI '" + uri.toString() + "'.");
}
- Class<? extends FileSystem> fsClass = null;
+ Class<? extends FileSystem> fsClass;
try {
fsClass = ClassUtils.getFileSystemByName(FSDIRECTORY.get(uri.getScheme()));
} catch (ClassNotFoundException e1) {
@@ -693,10 +682,9 @@ public abstract class FileSystem {
// file is a directory
final FileStatus[] files = this.listStatus(file.getPath());
- for (int i = 0; i < files.length; i++) {
-
- if (!files[i].isDir()) {
- numberOfBlocks += getNumberOfBlocks(files[i].getLen(), files[i].getBlockSize());
+ for (FileStatus file1 : files) {
+ if (!file1.isDir()) {
+ numberOfBlocks += getNumberOfBlocks(file1.getLen(), file1.getBlockSize());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
index 0ed4bcb..50fa63b 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
@@ -273,7 +273,7 @@ public class Path implements IOReadableWritable, Serializable {
}
final int start = slashed ? 1 : 0;
return path.length() >= start + 2
- && (slashed ? path.charAt(0) == '/' : true)
+ && (!slashed || path.charAt(0) == '/')
&& path.charAt(start + 1) == ':'
&& ((path.charAt(start) >= 'A' && path.charAt(start) <= 'Z') || (path.charAt(start) >= 'a' && path
.charAt(start) <= 'z'));
@@ -358,7 +358,7 @@ public class Path implements IOReadableWritable, Serializable {
// we can't use uri.toString(), which escapes everything, because we
// want
// illegal characters unescaped in the string, for glob processing, etc.
- final StringBuffer buffer = new StringBuffer();
+ final StringBuilder buffer = new StringBuilder();
if (uri.getScheme() != null) {
buffer.append(uri.getScheme());
buffer.append(":");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index a33720b..d4b570a 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -199,9 +199,9 @@ public class LocalFileSystem extends FileSystem {
if (f.isDirectory()) {
final File[] files = f.listFiles();
- for (int i = 0; i < files.length; i++) {
- final boolean del = delete(files[i]);
- if (del == false) {
+ for (File file : files) {
+ final boolean del = delete(file);
+ if (!del) {
return false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
index d6f4121..897e0ca 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
@@ -134,7 +134,7 @@ public class KMeansDataGenerator {
System.out.println("Wrote "+k+" cluster centers to "+tmpDir+"/"+CENTERS_FILE);
}
- private static final double[][] uniformRandomCenters(Random rnd, int num, int dimensionality, double range) {
+ private static double[][] uniformRandomCenters(Random rnd, int num, int dimensionality, double range) {
final double halfRange = range / 2;
final double[][] points = new double[num][dimensionality];
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
index a02e81c..5896dcf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Deque;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
@@ -160,7 +161,7 @@ public class SerializedUpdateBuffer extends AbstractPagedOutputView {
final ReadEnd readEnd;
if (numBuffersSpilled == 0 && emptyBuffers.size() >= minBuffersForWriteEnd) {
// read completely from in-memory segments
- readEnd = new ReadEnd(fullBuffers.removeFirst(), emptyBuffers, fullBuffers, null, null, segmentSize, 0);
+ readEnd = new ReadEnd(fullBuffers.removeFirst(), emptyBuffers, fullBuffers, null, null, 0);
} else {
int toSpill = Math.min(minBuffersForSpilledReadEnd + minBuffersForWriteEnd - emptyBuffers.size(),
fullBuffers.size());
@@ -184,7 +185,7 @@ public class SerializedUpdateBuffer extends AbstractPagedOutputView {
final BlockChannelReader reader = ioManager.createBlockChannelReader(currentWriter.getChannelID());
// gather some memory segments to circulate while reading back the data
- final ArrayList<MemorySegment> readSegments = new ArrayList<MemorySegment>();
+ final List<MemorySegment> readSegments = new ArrayList<MemorySegment>();
try {
while (readSegments.size() < minBuffersForSpilledReadEnd) {
readSegments.add(emptyBuffers.take());
@@ -196,8 +197,8 @@ public class SerializedUpdateBuffer extends AbstractPagedOutputView {
firstSeg = reader.getReturnQueue().take();
// create the read end reading one less buffer, because the first buffer is already read back
- readEnd = new ReadEnd(firstSeg, emptyBuffers, fullBuffers, reader, readSegments, segmentSize,
- numBuffersSpilled - 1);
+ readEnd = new ReadEnd(firstSeg, emptyBuffers, fullBuffers, reader, readSegments,
+ numBuffersSpilled - 1);
} catch (InterruptedException e) {
throw new RuntimeException(
"SerializedUpdateBuffer was interrupted while reclaiming memory by spilling.", e);
@@ -224,10 +225,11 @@ public class SerializedUpdateBuffer extends AbstractPagedOutputView {
try {
currentWriter.closeAndDelete();
} catch (Throwable t) {
+ // do nothing
}
}
- ArrayList<MemorySegment> freeMem = new ArrayList<MemorySegment>(64);
+ List<MemorySegment> freeMem = new ArrayList<MemorySegment>(64);
// add all memory allocated to the write end
freeMem.add(getCurrentSegment());
@@ -259,7 +261,7 @@ public class SerializedUpdateBuffer extends AbstractPagedOutputView {
private final LinkedBlockingQueue<MemorySegment> emptyBufferTarget;
- private final ArrayDeque<MemorySegment> fullBufferSource;
+ private final Deque<MemorySegment> fullBufferSource;
private final BlockChannelReader spilledBufferSource;
@@ -268,8 +270,8 @@ public class SerializedUpdateBuffer extends AbstractPagedOutputView {
private int requestsRemaining;
private ReadEnd(MemorySegment firstMemSegment, LinkedBlockingQueue<MemorySegment> emptyBufferTarget,
- ArrayDeque<MemorySegment> fullBufferSource, BlockChannelReader spilledBufferSource,
- ArrayList<MemorySegment> emptyBuffers, int segmentSize, int numBuffersSpilled)
+ Deque<MemorySegment> fullBufferSource, BlockChannelReader spilledBufferSource,
+ List<MemorySegment> emptyBuffers, int numBuffersSpilled)
throws IOException {
super(firstMemSegment, firstMemSegment.getInt(0), HEADER_LENGTH);
@@ -337,6 +339,7 @@ public class SerializedUpdateBuffer extends AbstractPagedOutputView {
try {
spilledBufferSource.closeAndDelete();
} catch (Throwable t) {
+ // do nothing
}
}
return true;
@@ -365,6 +368,7 @@ public class SerializedUpdateBuffer extends AbstractPagedOutputView {
try {
spilledBufferSource.closeAndDelete();
} catch (Throwable t) {
+ // do nothing
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
index 72530f0..780b34b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
@@ -30,8 +30,6 @@ import com.google.common.base.Preconditions;
public class SyncEventHandler implements EventListener {
-// private static final Logger log = LoggerFactory.getLogger(SyncEventHandler.class);
-
private final ClassLoader userCodeClassLoader;
private final Map<String, Aggregator<?>> aggregators;
@@ -65,11 +63,7 @@ public class SyncEventHandler implements EventListener {
}
workerDoneEventCounter++;
-
-// if (log.isInfoEnabled()) {
-// log.info("Sync event handler received WorkerDoneEvent event (" + workerDoneEventCounter + ")");
-// }
-
+
String[] aggNames = workerDoneEvent.getAggregatorNames();
Value[] aggregates = workerDoneEvent.getAggregates(userCodeClassLoader);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
index c503ef6..8ee4da4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
@@ -43,7 +43,7 @@ public class InputFormatVertex extends AbstractJobVertex {
public void initializeOnMaster(ClassLoader loader) throws Exception {
if (inputFormat == null) {
TaskConfig cfg = new TaskConfig(getConfiguration());
- UserCodeWrapper<InputFormat<?, ?>> wrapper = cfg.<InputFormat<?, ?>>getStubWrapper(loader);
+ UserCodeWrapper<InputFormat<?, ?>> wrapper = cfg.getStubWrapper(loader);
if (wrapper == null) {
throw new Exception("No input format present in InputFormatVertex's task configuration.");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index be691f0..e2568a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -29,6 +29,7 @@ import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.configuration.Configuration;
@@ -266,8 +267,8 @@ public class JobGraph implements IOReadableWritable {
return Collections.emptyList();
}
- ArrayList<AbstractJobVertex> sorted = new ArrayList<AbstractJobVertex>(this.taskVertices.size());
- LinkedHashSet<AbstractJobVertex> remaining = new LinkedHashSet<AbstractJobVertex>(this.taskVertices.values());
+ List<AbstractJobVertex> sorted = new ArrayList<AbstractJobVertex>(this.taskVertices.size());
+ Set<AbstractJobVertex> remaining = new LinkedHashSet<AbstractJobVertex>(this.taskVertices.values());
// start by finding the vertices with no input edges
// and the ones with disconnected inputs (that refer to some standalone data set)
@@ -301,7 +302,7 @@ public class JobGraph implements IOReadableWritable {
return sorted;
}
- private void addNodesThatHaveNoNewPredecessors(AbstractJobVertex start, ArrayList<AbstractJobVertex> target, LinkedHashSet<AbstractJobVertex> remaining) {
+ private void addNodesThatHaveNoNewPredecessors(AbstractJobVertex start, List<AbstractJobVertex> target, Set<AbstractJobVertex> remaining) {
// forward traverse over all produced data sets and all their consumers
for (IntermediateDataSet dataSet : start.getProducedDataSets()) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
index 3d76921..293b380 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
@@ -19,7 +19,6 @@
package org.apache.flink.api.scala.operators;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang3.Validate;
@@ -166,7 +165,7 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal
genName.setLength(genName.length()-1);
TypeSerializer<IN> serializer = getInputType().createSerializer();
- TypeSerializerFactory<IN> serializerFactory = null;
+ TypeSerializerFactory<IN> serializerFactory;
if (serializer.isStateful()) {
serializerFactory = new RuntimeStatefulSerializerFactory<IN>(
serializer, getInputType().getTypeClass());
@@ -214,12 +213,9 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal
SingleInputSemanticProperties props = new SingleInputSemanticProperties();
- for (int i = 0; i < logicalKeyPositions.length; i++) {
- int keyField = logicalKeyPositions[i];
+ for (int keyField : logicalKeyPositions) {
boolean keyFieldUsedInAgg = false;
-
- for (int k = 0; k < fields.length; k++) {
- int aggField = fields[k];
+ for (int aggField : fields) {
if (keyField == aggField) {
keyFieldUsedInAgg = true;
break;
@@ -273,8 +269,8 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal
@Override
public void open(Configuration parameters) throws Exception {
- for (int i = 0; i < aggFunctions.length; i++) {
- aggFunctions[i].initializeAggregate();
+ for (AggregationFunction<Object> aggFunction : aggFunctions) {
+ aggFunction.initializeAggregate();
}
serializer = (TupleSerializerBase<T>)serializerFactory.getSerializer();
}
@@ -287,10 +283,8 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal
// aggregators are initialized from before
T current = null;
- final Iterator<T> values = records.iterator();
- while (values.hasNext()) {
- current = values.next();
-
+ for (T record : records) {
+ current = record;
for (int i = 0; i < fieldPositions.length; i++) {
Object val = current.productElement(fieldPositions[i]);
aggFunctions[i].aggregate(val);