You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/11/20 12:19:57 UTC

incubator-flink git commit: Minor code clean up

Repository: incubator-flink
Updated Branches:
  refs/heads/master 5c3dceb9c -> 4203bf99d


Minor code clean up

This closes #221.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4203bf99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4203bf99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4203bf99

Branch: refs/heads/master
Commit: 4203bf99ddd009b72681ba0dc554cabd47e1c85c
Parents: 5c3dceb
Author: Suneel Marthi <su...@gmail.com>
Authored: Wed Nov 19 18:22:09 2014 -0500
Committer: uce <uc...@apache.org>
Committed: Thu Nov 20 12:17:59 2014 +0100

----------------------------------------------------------------------
 .../flink/client/CliFrontendTestUtils.java      |  6 ++---
 .../flink/configuration/Configuration.java      | 14 ++++++------
 .../configuration/GlobalConfiguration.java      |  4 ++--
 .../org/apache/flink/core/fs/FileSystem.java    | 24 +++++---------------
 .../java/org/apache/flink/core/fs/Path.java     |  4 ++--
 .../flink/core/fs/local/LocalFileSystem.java    |  6 ++---
 .../clustering/util/KMeansDataGenerator.java    |  2 +-
 .../iterative/io/SerializedUpdateBuffer.java    | 20 +++++++++-------
 .../iterative/task/SyncEventHandler.java        |  8 +------
 .../runtime/jobgraph/InputFormatVertex.java     |  2 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |  7 +++---
 .../scala/operators/ScalaAggregateOperator.java | 20 ++++++----------
 12 files changed, 48 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
index 9d0d526..9d4c6ae 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
@@ -19,19 +19,17 @@
 
 package org.apache.flink.client;
 
-import static org.junit.Assert.fail;
-
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.PrintStream;
 import java.lang.reflect.Field;
 import java.net.MalformedURLException;
 import java.util.Map;
-
-import org.apache.flink.client.CliFrontend;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 
+import static org.junit.Assert.fail;
+
 public class CliFrontendTestUtils {
 	
 	public static final String TEST_JAR_MAIN_CLASS = "org.apache.flink.client.testjar.WordCount";

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index e798609..fa9378a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -147,11 +147,11 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 			return (Integer) o;
 		}
 		else if (o.getClass() == Long.class) {
-			long value = ((Long) o).longValue();
+			long value = (Long) o;
 			if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
 				return (int) value;
 			} else {
-				LOG.warn("Configuation value " + value + " overflows/underflows the integer type.");
+				LOG.warn("Configuration value {} overflows/underflows the integer type.", value);
 				return defaultValue;
 			}
 		}
@@ -160,7 +160,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 				return Integer.parseInt(o.toString());
 			}
 			catch (NumberFormatException e) {
-				LOG.warn("Configuration cannot evaluate value " + o + " as an integer number");
+				LOG.warn("Configuration cannot evaluate value {} as an integer number", o);
 				return defaultValue;
 			}
 		}
@@ -280,7 +280,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 			if (value <= Float.MAX_VALUE && value >= Float.MIN_VALUE) {
 				return (float) value;
 			} else {
-				LOG.warn("Configuation value " + value + " overflows/underflows the float type.");
+				LOG.warn("Configuration value {} overflows/underflows the float type.", value);
 				return defaultValue;
 			}
 		}
@@ -289,7 +289,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 				return Float.parseFloat(o.toString());
 			}
 			catch (NumberFormatException e) {
-				LOG.warn("Configuration cannot evaluate value " + o + " as a float value");
+				LOG.warn("Configuration cannot evaluate value {} as a float value", o);
 				return defaultValue;
 			}
 		}
@@ -333,7 +333,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 				return Double.parseDouble(o.toString());
 			}
 			catch (NumberFormatException e) {
-				LOG.warn("Configuration cannot evaluate value " + o + " as a double value");
+				LOG.warn("Configuration cannot evaluate value {} as a double value", o);
 				return defaultValue;
 			}
 		}
@@ -370,7 +370,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 			return (byte[]) o;
 		}
 		else {
-			LOG.warn("Configuration cannot evaluate value " + o + " as a byte[] value");
+			LOG.warn("Configuration cannot evaluate value {} as a byte[] value", o);
 			return defaultValue;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index e57aa6c..7d40dbc 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -288,8 +288,8 @@ public final class GlobalConfiguration {
 		try {
 
 			final DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
-			Document doc = null;
-			Element root = null;
+			Document doc;
+			Element root;
 
 			doc = builder.parse(file);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 91780fd..9483290 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -111,21 +111,10 @@ public abstract class FileSystem {
 				}
 
 				if ((this.authority == null) || (key.authority == null)) {
-
-					if (this.authority == null && key.authority == null) {
-						return true;
-					}
-
-					return false;
+					return this.authority == null && key.authority == null;
 				}
-
-				if (!this.authority.equals(key.authority)) {
-					return false;
-				}
-
-				return true;
+				return this.authority.equals(key.authority);
 			}
-
 			return false;
 		}
 
@@ -234,7 +223,7 @@ public abstract class FileSystem {
 						+ ", referenced in file URI '" + uri.toString() + "'.");
 			}
 
-			Class<? extends FileSystem> fsClass = null;
+			Class<? extends FileSystem> fsClass;
 			try {
 				fsClass = ClassUtils.getFileSystemByName(FSDIRECTORY.get(uri.getScheme()));
 			} catch (ClassNotFoundException e1) {
@@ -693,10 +682,9 @@ public abstract class FileSystem {
 
 		// file is a directory
 		final FileStatus[] files = this.listStatus(file.getPath());
-		for (int i = 0; i < files.length; i++) {
-
-			if (!files[i].isDir()) {
-				numberOfBlocks += getNumberOfBlocks(files[i].getLen(), files[i].getBlockSize());
+		for (FileStatus file1 : files) {
+			if (!file1.isDir()) {
+				numberOfBlocks += getNumberOfBlocks(file1.getLen(), file1.getBlockSize());
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
index 0ed4bcb..50fa63b 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
@@ -273,7 +273,7 @@ public class Path implements IOReadableWritable, Serializable {
 		}
 		final int start = slashed ? 1 : 0;
 		return path.length() >= start + 2
-			&& (slashed ? path.charAt(0) == '/' : true)
+			&& (!slashed || path.charAt(0) == '/')
 			&& path.charAt(start + 1) == ':'
 			&& ((path.charAt(start) >= 'A' && path.charAt(start) <= 'Z') || (path.charAt(start) >= 'a' && path
 				.charAt(start) <= 'z'));
@@ -358,7 +358,7 @@ public class Path implements IOReadableWritable, Serializable {
 		// we can't use uri.toString(), which escapes everything, because we
 		// want
 		// illegal characters unescaped in the string, for glob processing, etc.
-		final StringBuffer buffer = new StringBuffer();
+		final StringBuilder buffer = new StringBuilder();
 		if (uri.getScheme() != null) {
 			buffer.append(uri.getScheme());
 			buffer.append(":");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index a33720b..d4b570a 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -199,9 +199,9 @@ public class LocalFileSystem extends FileSystem {
 		if (f.isDirectory()) {
 
 			final File[] files = f.listFiles();
-			for (int i = 0; i < files.length; i++) {
-				final boolean del = delete(files[i]);
-				if (del == false) {
+			for (File file : files) {
+				final boolean del = delete(file);
+				if (!del) {
 					return false;
 				}
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
index d6f4121..897e0ca 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
@@ -134,7 +134,7 @@ public class KMeansDataGenerator {
 		System.out.println("Wrote "+k+" cluster centers to "+tmpDir+"/"+CENTERS_FILE);
 	}
 	
-	private static final double[][] uniformRandomCenters(Random rnd, int num, int dimensionality, double range) {
+	private static double[][] uniformRandomCenters(Random rnd, int num, int dimensionality, double range) {
 		final double halfRange = range / 2;
 		final double[][] points = new double[num][dimensionality];
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
index a02e81c..5896dcf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -160,7 +161,7 @@ public class SerializedUpdateBuffer extends AbstractPagedOutputView {
 		final ReadEnd readEnd;
 		if (numBuffersSpilled == 0 && emptyBuffers.size() >= minBuffersForWriteEnd) {
 			// read completely from in-memory segments
-			readEnd = new ReadEnd(fullBuffers.removeFirst(), emptyBuffers, fullBuffers, null, null, segmentSize, 0);
+			readEnd = new ReadEnd(fullBuffers.removeFirst(), emptyBuffers, fullBuffers, null, null, 0);
 		} else {
 			int toSpill = Math.min(minBuffersForSpilledReadEnd + minBuffersForWriteEnd - emptyBuffers.size(),
 				fullBuffers.size());
@@ -184,7 +185,7 @@ public class SerializedUpdateBuffer extends AbstractPagedOutputView {
 			final BlockChannelReader reader = ioManager.createBlockChannelReader(currentWriter.getChannelID());
 
 			// gather some memory segments to circulate while reading back the data
-			final ArrayList<MemorySegment> readSegments = new ArrayList<MemorySegment>();
+			final List<MemorySegment> readSegments = new ArrayList<MemorySegment>();
 			try {
 				while (readSegments.size() < minBuffersForSpilledReadEnd) {
 					readSegments.add(emptyBuffers.take());
@@ -196,8 +197,8 @@ public class SerializedUpdateBuffer extends AbstractPagedOutputView {
 				firstSeg = reader.getReturnQueue().take();
 
 				// create the read end reading one less buffer, because the first buffer is already read back
-				readEnd = new ReadEnd(firstSeg, emptyBuffers, fullBuffers, reader, readSegments, segmentSize,
-					numBuffersSpilled - 1);
+				readEnd = new ReadEnd(firstSeg, emptyBuffers, fullBuffers, reader, readSegments,
+						numBuffersSpilled - 1);
 			} catch (InterruptedException e) {
 				throw new RuntimeException(
 					"SerializedUpdateBuffer was interrupted while reclaiming memory by spilling.", e);
@@ -224,10 +225,11 @@ public class SerializedUpdateBuffer extends AbstractPagedOutputView {
 			try {
 				currentWriter.closeAndDelete();
 			} catch (Throwable t) {
+				// do nothing
 			}
 		}
 
-		ArrayList<MemorySegment> freeMem = new ArrayList<MemorySegment>(64);
+		List<MemorySegment> freeMem = new ArrayList<MemorySegment>(64);
 
 		// add all memory allocated to the write end
 		freeMem.add(getCurrentSegment());
@@ -259,7 +261,7 @@ public class SerializedUpdateBuffer extends AbstractPagedOutputView {
 
 		private final LinkedBlockingQueue<MemorySegment> emptyBufferTarget;
 
-		private final ArrayDeque<MemorySegment> fullBufferSource;
+		private final Deque<MemorySegment> fullBufferSource;
 
 		private final BlockChannelReader spilledBufferSource;
 
@@ -268,8 +270,8 @@ public class SerializedUpdateBuffer extends AbstractPagedOutputView {
 		private int requestsRemaining;
 
 		private ReadEnd(MemorySegment firstMemSegment, LinkedBlockingQueue<MemorySegment> emptyBufferTarget,
-				ArrayDeque<MemorySegment> fullBufferSource, BlockChannelReader spilledBufferSource,
-				ArrayList<MemorySegment> emptyBuffers, int segmentSize, int numBuffersSpilled)
+										Deque<MemorySegment> fullBufferSource, BlockChannelReader spilledBufferSource,
+										List<MemorySegment> emptyBuffers, int numBuffersSpilled)
 			throws IOException {
 			super(firstMemSegment, firstMemSegment.getInt(0), HEADER_LENGTH);
 
@@ -337,6 +339,7 @@ public class SerializedUpdateBuffer extends AbstractPagedOutputView {
 						try {
 							spilledBufferSource.closeAndDelete();
 						} catch (Throwable t) {
+							// do nothing
 						}
 					}
 					return true;
@@ -365,6 +368,7 @@ public class SerializedUpdateBuffer extends AbstractPagedOutputView {
 				try {
 					spilledBufferSource.closeAndDelete();
 				} catch (Throwable t) {
+					// do nothing
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
index 72530f0..780b34b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
@@ -30,8 +30,6 @@ import com.google.common.base.Preconditions;
 
 public class SyncEventHandler implements EventListener {
 	
-//	private static final Logger log = LoggerFactory.getLogger(SyncEventHandler.class);
-	
 	private final ClassLoader userCodeClassLoader;
 	
 	private final Map<String, Aggregator<?>> aggregators;
@@ -65,11 +63,7 @@ public class SyncEventHandler implements EventListener {
 		}
 		
 		workerDoneEventCounter++;
-		
-//		if (log.isInfoEnabled()) {
-//			log.info("Sync event handler received WorkerDoneEvent event (" + workerDoneEventCounter + ")");
-//		}
-		
+
 		String[] aggNames = workerDoneEvent.getAggregatorNames();
 		Value[] aggregates = workerDoneEvent.getAggregates(userCodeClassLoader);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
index c503ef6..8ee4da4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
@@ -43,7 +43,7 @@ public class InputFormatVertex extends AbstractJobVertex {
 	public void initializeOnMaster(ClassLoader loader) throws Exception {
 		if (inputFormat == null) {
 			TaskConfig cfg = new TaskConfig(getConfiguration());
-			UserCodeWrapper<InputFormat<?, ?>> wrapper = cfg.<InputFormat<?, ?>>getStubWrapper(loader);
+			UserCodeWrapper<InputFormat<?, ?>> wrapper = cfg.getStubWrapper(loader);
 			
 			if (wrapper == null) {
 				throw new Exception("No input format present in InputFormatVertex's task configuration.");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index be691f0..e2568a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -29,6 +29,7 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.configuration.Configuration;
@@ -266,8 +267,8 @@ public class JobGraph implements IOReadableWritable {
 			return Collections.emptyList();
 		}
 		
-		ArrayList<AbstractJobVertex> sorted = new ArrayList<AbstractJobVertex>(this.taskVertices.size());
-		LinkedHashSet<AbstractJobVertex> remaining = new LinkedHashSet<AbstractJobVertex>(this.taskVertices.values());
+		List<AbstractJobVertex> sorted = new ArrayList<AbstractJobVertex>(this.taskVertices.size());
+		Set<AbstractJobVertex> remaining = new LinkedHashSet<AbstractJobVertex>(this.taskVertices.values());
 		
 		// start by finding the vertices with no input edges
 		// and the ones with disconnected inputs (that refer to some standalone data set)
@@ -301,7 +302,7 @@ public class JobGraph implements IOReadableWritable {
 		return sorted;
 	}
 	
-	private void addNodesThatHaveNoNewPredecessors(AbstractJobVertex start, ArrayList<AbstractJobVertex> target, LinkedHashSet<AbstractJobVertex> remaining) {
+	private void addNodesThatHaveNoNewPredecessors(AbstractJobVertex start, List<AbstractJobVertex> target, Set<AbstractJobVertex> remaining) {
 		
 		// forward traverse over all produced data sets and all their consumers
 		for (IntermediateDataSet dataSet : start.getProducedDataSets()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4203bf99/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
index 3d76921..293b380 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
@@ -19,7 +19,6 @@
 package org.apache.flink.api.scala.operators;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.commons.lang3.Validate;
@@ -166,7 +165,7 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal
 		genName.setLength(genName.length()-1);
 
 		TypeSerializer<IN> serializer = getInputType().createSerializer();
-		TypeSerializerFactory<IN> serializerFactory = null;
+		TypeSerializerFactory<IN> serializerFactory;
 		if (serializer.isStateful()) {
 			serializerFactory = new RuntimeStatefulSerializerFactory<IN>(
 					serializer, getInputType().getTypeClass());
@@ -214,12 +213,9 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal
 
 			SingleInputSemanticProperties props = new SingleInputSemanticProperties();
 
-			for (int i = 0; i < logicalKeyPositions.length; i++) {
-				int keyField = logicalKeyPositions[i];
+			for (int keyField : logicalKeyPositions) {
 				boolean keyFieldUsedInAgg = false;
-
-				for (int k = 0; k < fields.length; k++) {
-					int aggField = fields[k];
+				for (int aggField : fields) {
 					if (keyField == aggField) {
 						keyFieldUsedInAgg = true;
 						break;
@@ -273,8 +269,8 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal
 
 		@Override
 		public void open(Configuration parameters) throws Exception {
-			for (int i = 0; i < aggFunctions.length; i++) {
-				aggFunctions[i].initializeAggregate();
+			for (AggregationFunction<Object> aggFunction : aggFunctions) {
+				aggFunction.initializeAggregate();
 			}
 			serializer = (TupleSerializerBase<T>)serializerFactory.getSerializer();
 		}
@@ -287,10 +283,8 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal
 			// aggregators are initialized from before
 
 			T current = null;
-			final Iterator<T> values = records.iterator();
-			while (values.hasNext()) {
-				current = values.next();
-
+			for (T record : records) {
+				current = record;
 				for (int i = 0; i < fieldPositions.length; i++) {
 					Object val = current.productElement(fieldPositions[i]);
 					aggFunctions[i].aggregate(val);