You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/30 13:09:45 UTC

[2/2] flink git commit: [FLINK-7736] Fix some lgtm.com alerts

[FLINK-7736] Fix some lgtm.com alerts

This closes #4784.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed14135b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed14135b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed14135b

Branch: refs/heads/master
Commit: ed14135b36704e779614f72e5955e476fee0cca0
Parents: b4e90fe
Author: Malcolm Taylor <ma...@semmle.com>
Authored: Sun Oct 8 08:06:17 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jan 30 14:08:01 2018 +0100

----------------------------------------------------------------------
 .../flink/optimizer/dag/GroupReduceNode.java    |  2 +-
 .../webmonitor/handlers/JarListHandler.java     |  3 +-
 .../flink/runtime/memory/MemoryManager.java     |  2 +-
 .../operators/hash/InPlaceMutableHashTable.java |  4 +--
 .../runtime/taskexecutor/TaskExecutor.java      |  2 +-
 .../taskexecutor/slot/TaskSlotTable.java        |  2 +-
 .../flink/runtime/util/JarFileCreator.java      |  2 +-
 .../webmonitor/history/ArchivedJson.java        |  8 +++++-
 .../webmonitor/history/ArchivedJsonTest.java    | 10 +++++++
 .../source/SocketTextStreamFunction.java        | 29 ++++++++++----------
 .../streaming/util/typeutils/FieldAccessor.java |  4 ---
 .../flink/yarn/YarnApplicationMasterRunner.java |  2 +-
 .../apache/flink/yarn/YarnResourceManager.java  |  2 +-
 13 files changed, 42 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
index bd118ec..bef2c0b 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
@@ -97,7 +97,7 @@ public class GroupReduceNode extends SingleInputNode {
 		
 		// check if we can work with a grouping (simple reducer), or if we need ordering because of a group order
 		Ordering groupOrder = null;
-		if (getOperator() instanceof GroupReduceOperatorBase) {
+		if (getOperator() != null) {
 			groupOrder = getOperator().getGroupOrder();
 			if (groupOrder != null && groupOrder.getNumberOfFields() == 0) {
 				groupOrder = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index 2b56ecd..66cc7f5 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -111,8 +111,7 @@ public class JarListHandler extends AbstractJsonRequestHandler {
 						gen.writeArrayFieldStart("entry");
 
 						String[] classes = new String[0];
-						try {
-							JarFile jar = new JarFile(f);
+						try (JarFile jar = new JarFile(f)) {
 							Manifest manifest = jar.getManifest();
 							String assemblerClass = null;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index c1a98cf..f3bea87 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -568,7 +568,7 @@ public class MemoryManager {
 	 * @return The number of pages corresponding to the memory fraction.
 	 */
 	public long computeMemorySize(double fraction) {
-		return pageSize * computeNumberOfPages(fraction);
+		return pageSize * (long) computeNumberOfPages(fraction);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTable.java
index bfc9aec..22a5d2a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTable.java
@@ -199,7 +199,7 @@ public class InPlaceMutableHashTable<T> extends AbstractMutableHashTable<T> {
 	 * @return The hash table's total capacity.
 	 */
 	public long getCapacity() {
-		return numAllMemorySegments * segmentSize;
+		return numAllMemorySegments * (long)segmentSize;
 	}
 
 	/**
@@ -562,7 +562,7 @@ public class InPlaceMutableHashTable<T> extends AbstractMutableHashTable<T> {
 		}
 
 		public long getTotalSize() {
-			return segments.size() * segmentSize;
+			return segments.size() * (long)segmentSize;
 		}
 
 		// ----------------------- Output -----------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 0f98c49..30cf377 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -981,7 +981,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		Preconditions.checkNotNull(jobID);
 		Preconditions.checkNotNull(resourceID);
 		Preconditions.checkNotNull(jobMasterGateway);
-		Preconditions.checkArgument(blobPort > 0 || blobPort < MAX_BLOB_PORT, "Blob server port is out of range.");
+		Preconditions.checkArgument(blobPort > 0 && blobPort < MAX_BLOB_PORT, "Blob server port is out of range.");
 
 		TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobMasterGateway);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index ab62a86..fcb2761 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -452,7 +452,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 				throw new SlotNotActiveException(task.getJobID(), task.getAllocationId());
 			}
 		} else {
-			throw new SlotNotFoundException(taskSlot.getAllocationId());
+			throw new SlotNotFoundException(task.getAllocationId());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
index ad7906a..d77c9f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
@@ -190,7 +190,7 @@ public class JarFileCreator {
 			this.outputFile.delete();
 		}
 
-		try ( JarOutputStream jos = new JarOutputStream(new FileOutputStream(this.outputFile), new Manifest())) {
+		try ( FileOutputStream fos = new FileOutputStream(this.outputFile); JarOutputStream jos = new JarOutputStream(fos, new Manifest())) {
 			final Iterator<Class<?>> it = this.classSet.iterator();
 			while (it.hasNext()) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
index 23e7676..a15dd52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.runtime.webmonitor.history;
 
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Objects;
+
 /**
  * A simple container for a handler's JSON response and the REST URLs for which the response would've been returned.
  *
@@ -56,6 +57,11 @@ public class ArchivedJson {
 	}
 
 	@Override
+	public int hashCode() {
+		return Objects.hash(path, json);
+	}
+
+	@Override
 	public String toString() {
 		return path + ":" + json;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java
index 57ebbc9..05fe2eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java
@@ -38,4 +38,14 @@ public class ArchivedJsonTest {
 		Assert.assertNotEquals(original, identicalPath);
 		Assert.assertNotEquals(original, identicalJson);
 	}
+
+	@Test
+	public void testHashCode() {
+		ArchivedJson original = new ArchivedJson("path", "json");
+		ArchivedJson twin = new ArchivedJson("path", "json");
+
+		Assert.assertEquals(original, original);
+		Assert.assertEquals(original, twin);
+		Assert.assertEquals(original.hashCode(), twin.hashCode());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
index 1552ee2..8d04257 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
@@ -94,21 +94,22 @@ public class SocketTextStreamFunction implements SourceFunction<String> {
 
 				LOG.info("Connecting to server socket " + hostname + ':' + port);
 				socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
-				BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
-
-				char[] cbuf = new char[8192];
-				int bytesRead;
-				while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
-					buffer.append(cbuf, 0, bytesRead);
-					int delimPos;
-					while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {
-						String record = buffer.substring(0, delimPos);
-						// truncate trailing carriage return
-						if (delimiter.equals("\n") && record.endsWith("\r")) {
-							record = record.substring(0, record.length() - 1);
+				try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
+
+					char[] cbuf = new char[8192];
+					int bytesRead;
+					while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
+						buffer.append(cbuf, 0, bytesRead);
+						int delimPos;
+						while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {
+							String record = buffer.substring(0, delimPos);
+							// truncate trailing carriage return
+							if (delimiter.equals("\n") && record.endsWith("\r")) {
+								record = record.substring(0, record.length() - 1);
+							}
+							ctx.collect(record);
+							buffer.delete(0, delimPos + delimiter.length());
 						}
-						ctx.collect(record);
-						buffer.delete(0, delimPos + delimiter.length());
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
index 3751670..411e4ad 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
@@ -204,10 +204,6 @@ public abstract class FieldAccessor<T, F> implements Serializable {
 						typeInfo.toString() + "\", which is an invalid index.");
 			}
 
-			if (pos < 0) {
-				throw new CompositeType.InvalidFieldReferenceException("Tried to select " + ((Integer) pos).toString() + ". field.");
-			}
-
 			this.pos = pos;
 			this.innerAccessor = innerAccessor;
 			this.fieldType = innerAccessor.fieldType;

http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index e97fac3..68c0aec 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -139,7 +139,7 @@ public class YarnApplicationMasterRunner {
 			LOG.debug("All environment variables: {}", ENV);
 
 			final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-			require(yarnClientUsername != null, "YARN client user name environment variable {} not set",
+			require(yarnClientUsername != null, "YARN client user name environment variable (%s) not set",
 				YarnConfigKeys.ENV_HADOOP_USER_NAME);
 
 			final String currDir = ENV.get(Environment.PWD.key());

http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 910172d..4e5cfce 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -292,7 +292,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 			resourceManagerClient.releaseAssignedContainer(container.getId());
 			workerNodeMap.remove(workerNode.getResourceID());
 		} else {
-			log.error("Can not find container with resource ID {}.", workerNode.getResourceID());
+			log.error("Can not find container for null workerNode.");
 		}
 		return true;
 	}