You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/30 13:09:45 UTC
[2/2] flink git commit: [FLINK-7736] Fix some lgtm.com alerts
[FLINK-7736] Fix some lgtm.com alerts
This closes #4784.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed14135b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed14135b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed14135b
Branch: refs/heads/master
Commit: ed14135b36704e779614f72e5955e476fee0cca0
Parents: b4e90fe
Author: Malcolm Taylor <ma...@semmle.com>
Authored: Sun Oct 8 08:06:17 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jan 30 14:08:01 2018 +0100
----------------------------------------------------------------------
.../flink/optimizer/dag/GroupReduceNode.java | 2 +-
.../webmonitor/handlers/JarListHandler.java | 3 +-
.../flink/runtime/memory/MemoryManager.java | 2 +-
.../operators/hash/InPlaceMutableHashTable.java | 4 +--
.../runtime/taskexecutor/TaskExecutor.java | 2 +-
.../taskexecutor/slot/TaskSlotTable.java | 2 +-
.../flink/runtime/util/JarFileCreator.java | 2 +-
.../webmonitor/history/ArchivedJson.java | 8 +++++-
.../webmonitor/history/ArchivedJsonTest.java | 10 +++++++
.../source/SocketTextStreamFunction.java | 29 ++++++++++----------
.../streaming/util/typeutils/FieldAccessor.java | 4 ---
.../flink/yarn/YarnApplicationMasterRunner.java | 2 +-
.../apache/flink/yarn/YarnResourceManager.java | 2 +-
13 files changed, 42 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
index bd118ec..bef2c0b 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
@@ -97,7 +97,7 @@ public class GroupReduceNode extends SingleInputNode {
// check if we can work with a grouping (simple reducer), or if we need ordering because of a group order
Ordering groupOrder = null;
- if (getOperator() instanceof GroupReduceOperatorBase) {
+ if (getOperator() != null) {
groupOrder = getOperator().getGroupOrder();
if (groupOrder != null && groupOrder.getNumberOfFields() == 0) {
groupOrder = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index 2b56ecd..66cc7f5 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -111,8 +111,7 @@ public class JarListHandler extends AbstractJsonRequestHandler {
gen.writeArrayFieldStart("entry");
String[] classes = new String[0];
- try {
- JarFile jar = new JarFile(f);
+ try (JarFile jar = new JarFile(f)) {
Manifest manifest = jar.getManifest();
String assemblerClass = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index c1a98cf..f3bea87 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -568,7 +568,7 @@ public class MemoryManager {
* @return The number of pages corresponding to the memory fraction.
*/
public long computeMemorySize(double fraction) {
- return pageSize * computeNumberOfPages(fraction);
+ return pageSize * (long) computeNumberOfPages(fraction);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTable.java
index bfc9aec..22a5d2a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTable.java
@@ -199,7 +199,7 @@ public class InPlaceMutableHashTable<T> extends AbstractMutableHashTable<T> {
* @return The hash table's total capacity.
*/
public long getCapacity() {
- return numAllMemorySegments * segmentSize;
+ return numAllMemorySegments * (long)segmentSize;
}
/**
@@ -562,7 +562,7 @@ public class InPlaceMutableHashTable<T> extends AbstractMutableHashTable<T> {
}
public long getTotalSize() {
- return segments.size() * segmentSize;
+ return segments.size() * (long)segmentSize;
}
// ----------------------- Output -----------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 0f98c49..30cf377 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -981,7 +981,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
Preconditions.checkNotNull(jobID);
Preconditions.checkNotNull(resourceID);
Preconditions.checkNotNull(jobMasterGateway);
- Preconditions.checkArgument(blobPort > 0 || blobPort < MAX_BLOB_PORT, "Blob server port is out of range.");
+ Preconditions.checkArgument(blobPort > 0 && blobPort < MAX_BLOB_PORT, "Blob server port is out of range.");
TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobMasterGateway);
http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index ab62a86..fcb2761 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -452,7 +452,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
throw new SlotNotActiveException(task.getJobID(), task.getAllocationId());
}
} else {
- throw new SlotNotFoundException(taskSlot.getAllocationId());
+ throw new SlotNotFoundException(task.getAllocationId());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
index ad7906a..d77c9f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
@@ -190,7 +190,7 @@ public class JarFileCreator {
this.outputFile.delete();
}
- try ( JarOutputStream jos = new JarOutputStream(new FileOutputStream(this.outputFile), new Manifest())) {
+ try ( FileOutputStream fos = new FileOutputStream(this.outputFile); JarOutputStream jos = new JarOutputStream(fos, new Manifest())) {
final Iterator<Class<?>> it = this.classSet.iterator();
while (it.hasNext()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
index 23e7676..a15dd52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
@@ -18,9 +18,10 @@
package org.apache.flink.runtime.webmonitor.history;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.util.Preconditions;
+import java.util.Objects;
+
/**
* A simple container for a handler's JSON response and the REST URLs for which the response would've been returned.
*
@@ -56,6 +57,11 @@ public class ArchivedJson {
}
@Override
+ public int hashCode() {
+ return Objects.hash(path, json);
+ }
+
+ @Override
public String toString() {
return path + ":" + json;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java
index 57ebbc9..05fe2eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java
@@ -38,4 +38,14 @@ public class ArchivedJsonTest {
Assert.assertNotEquals(original, identicalPath);
Assert.assertNotEquals(original, identicalJson);
}
+
+ @Test
+ public void testHashCode() {
+ ArchivedJson original = new ArchivedJson("path", "json");
+ ArchivedJson twin = new ArchivedJson("path", "json");
+
+ Assert.assertEquals(original, original);
+ Assert.assertEquals(original, twin);
+ Assert.assertEquals(original.hashCode(), twin.hashCode());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
index 1552ee2..8d04257 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
@@ -94,21 +94,22 @@ public class SocketTextStreamFunction implements SourceFunction<String> {
LOG.info("Connecting to server socket " + hostname + ':' + port);
socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
- BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
-
- char[] cbuf = new char[8192];
- int bytesRead;
- while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
- buffer.append(cbuf, 0, bytesRead);
- int delimPos;
- while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {
- String record = buffer.substring(0, delimPos);
- // truncate trailing carriage return
- if (delimiter.equals("\n") && record.endsWith("\r")) {
- record = record.substring(0, record.length() - 1);
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
+
+ char[] cbuf = new char[8192];
+ int bytesRead;
+ while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
+ buffer.append(cbuf, 0, bytesRead);
+ int delimPos;
+ while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {
+ String record = buffer.substring(0, delimPos);
+ // truncate trailing carriage return
+ if (delimiter.equals("\n") && record.endsWith("\r")) {
+ record = record.substring(0, record.length() - 1);
+ }
+ ctx.collect(record);
+ buffer.delete(0, delimPos + delimiter.length());
}
- ctx.collect(record);
- buffer.delete(0, delimPos + delimiter.length());
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
index 3751670..411e4ad 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
@@ -204,10 +204,6 @@ public abstract class FieldAccessor<T, F> implements Serializable {
typeInfo.toString() + "\", which is an invalid index.");
}
- if (pos < 0) {
- throw new CompositeType.InvalidFieldReferenceException("Tried to select " + ((Integer) pos).toString() + ". field.");
- }
-
this.pos = pos;
this.innerAccessor = innerAccessor;
this.fieldType = innerAccessor.fieldType;
http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index e97fac3..68c0aec 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -139,7 +139,7 @@ public class YarnApplicationMasterRunner {
LOG.debug("All environment variables: {}", ENV);
final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
- require(yarnClientUsername != null, "YARN client user name environment variable {} not set",
+ require(yarnClientUsername != null, "YARN client user name environment variable (%s) not set",
YarnConfigKeys.ENV_HADOOP_USER_NAME);
final String currDir = ENV.get(Environment.PWD.key());
http://git-wip-us.apache.org/repos/asf/flink/blob/ed14135b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 910172d..4e5cfce 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -292,7 +292,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
resourceManagerClient.releaseAssignedContainer(container.getId());
workerNodeMap.remove(workerNode.getResourceID());
} else {
- log.error("Can not find container with resource ID {}.", workerNode.getResourceID());
+ log.error("Can not find container for null workerNode.");
}
return true;
}