You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/11/28 03:32:02 UTC
[1/4] flink git commit: [FLINK-8142] [config] Cleanup references to
deprecated constants in ConfigConstants
Repository: flink
Updated Branches:
refs/heads/master 45c864071 -> d08b7fe4f
[FLINK-8142] [config] Cleanup references to deprecated constants in ConfigConstants
This closes #5067
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2b804a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2b804a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2b804a7
Branch: refs/heads/master
Commit: f2b804a7479dcba7980dd68a445635f4ac2198c0
Parents: 45c8640
Author: yew1eb <ye...@gmail.com>
Authored: Thu Nov 23 23:52:38 2017 +0800
Committer: Greg Hogan <co...@greghogan.com>
Committed: Mon Nov 27 15:15:39 2017 -0500
----------------------------------------------------------------------
.../connectors/kinesis/manualtests/ManualExactlyOnceTest.java | 3 ++-
.../manualtests/ManualExactlyOnceWithStreamReshardingTest.java | 3 ++-
.../runtime/executiongraph/restart/RestartStrategyFactory.java | 6 +++---
.../flink/runtime/taskmanager/TaskManagerStartupTest.java | 2 +-
.../org/apache/flink/yarn/AbstractYarnClusterDescriptor.java | 4 ++--
.../java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java | 2 +-
6 files changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f2b804a7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
index 7abcd3c..67ddad2 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.manualtests;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
@@ -79,7 +80,7 @@ public class ManualExactlyOnceTest {
final Configuration flinkConfig = new Configuration();
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
- flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+ flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false);
http://git-wip-us.apache.org/repos/asf/flink/blob/f2b804a7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
index 226ac3e..cef8720 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.manualtests;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
@@ -90,7 +91,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
final Configuration flinkConfig = new Configuration();
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
- flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+ flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false);
http://git-wip-us.apache.org/repos/asf/flink/blob/f2b804a7/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
index d1f547f..717e1d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
@@ -87,10 +87,10 @@ public abstract class RestartStrategyFactory implements Serializable {
switch (restartStrategyName.toLowerCase()) {
case "none":
// support deprecated ConfigConstants values
- final int numberExecutionRetries = configuration.getInteger(ConfigConstants.EXECUTION_RETRIES_KEY,
+ final int numberExecutionRetries = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
ConfigConstants.DEFAULT_EXECUTION_RETRIES);
String pauseString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE);
- String delayString = configuration.getString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY,
+ String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY,
pauseString);
long delay;
@@ -104,7 +104,7 @@ public abstract class RestartStrategyFactory implements Serializable {
". Value must be a valid duration (such as '10 s' or '1 min')");
} else {
throw new Exception("Invalid config value for " +
- ConfigConstants.EXECUTION_RETRY_DELAY_KEY + ": " + delayString +
+ ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString +
". Value must be a valid duration (such as '100 milli' or '10 s')");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f2b804a7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
index 2e6c580..ed06dc0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
@@ -187,7 +187,7 @@ public class TaskManagerStartupTest extends TestLogger {
Configuration cfg = new Configuration();
cfg.setString(JobManagerOptions.ADDRESS, "localhost");
cfg.setInteger(JobManagerOptions.PORT, 21656);
- cfg.setString(ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY, "true");
+ cfg.setBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE, true);
// something invalid
cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -42L);
http://git-wip-us.apache.org/repos/asf/flink/blob/f2b804a7/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 5ac5c4e..468c0c8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -466,8 +466,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
int port = report.getRpcPort();
// Correctly initialize the Flink config
- flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
- flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
+ flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
+ flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
// the Flink cluster is deployed in YARN. Represent cluster
return createYarnClusterClient(
http://git-wip-us.apache.org/repos/asf/flink/blob/f2b804a7/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index ffa57f1..c903a76 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -118,7 +118,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
/**
* @deprecated Streaming mode has been deprecated without replacement. Set the
- * {@link ConfigConstants#TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY} configuration
+ * {@link TaskManagerOptions#MANAGED_MEMORY_PRE_ALLOCATE} configuration
* key to true to get the previous batch mode behaviour.
*/
@Deprecated
[2/4] flink git commit: [FLINK-8105] Remove "unnecessary 'null' check
before 'instanceof' expression"
Posted by gr...@apache.org.
[FLINK-8105] Remove "unnecessary 'null' check before 'instanceof' expression"
This closes #5034
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3561222c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3561222c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3561222c
Branch: refs/heads/master
Commit: 3561222c5d6c7cee79f8c5872f32227632135c48
Parents: f2b804a
Author: yew1eb <ye...@gmail.com>
Authored: Mon Nov 20 00:50:57 2017 +0800
Committer: Greg Hogan <co...@greghogan.com>
Committed: Mon Nov 27 15:27:20 2017 -0500
----------------------------------------------------------------------
.../flink/api/java/hadoop/mapred/HadoopInputFormatBase.java | 2 +-
.../flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java | 2 +-
.../java/org/apache/flink/storm/api/FlinkLocalCluster.java | 2 +-
.../java/org/apache/flink/api/common/io/BinaryInputFormat.java | 2 +-
.../org/apache/flink/api/common/io/DelimitedInputFormat.java | 2 +-
.../java/org/apache/flink/api/common/io/FileInputFormat.java | 2 +-
.../api/java/typeutils/runtime/RuntimeSerializerFactory.java | 2 +-
.../src/main/java/org/apache/flink/core/fs/FileInputSplit.java | 2 +-
.../main/java/org/apache/flink/core/io/GenericInputSplit.java | 2 +-
.../java/org/apache/flink/core/io/LocatableInputSplit.java | 2 +-
.../main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java | 2 +-
.../java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java | 2 +-
.../apache/flink/api/java/io/CollectionInputFormatTest.java | 2 +-
.../flink/optimizer/dataproperties/GlobalProperties.java | 2 +-
.../flink/optimizer/dataproperties/InterestingProperties.java | 2 +-
.../optimizer/dataproperties/RequestedGlobalProperties.java | 2 +-
.../flink/optimizer/plantranslate/JobGraphGenerator.java | 5 ++---
.../org/apache/flink/optimizer/traversals/PlanFinalizer.java | 6 +++---
.../flink/runtime/taskmanager/TaskManagerRegistrationTest.java | 2 +-
.../api/functions/source/TimestampedFileInputSplit.java | 2 +-
.../streaming/runtime/operators/GenericWriteAheadSink.java | 2 +-
21 files changed, 24 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3561222c/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
index 27a477c..5c26a58 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
@@ -121,7 +121,7 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
return null;
}
- final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
+ final FileBaseStatistics cachedFileStats = (cachedStats instanceof FileBaseStatistics) ?
(FileBaseStatistics) cachedStats : null;
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/3561222c/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
index 06205e9..6734d5f 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
@@ -119,7 +119,7 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
JobContext jobContext = new JobContextImpl(configuration, null);
- final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
+ final FileBaseStatistics cachedFileStats = (cachedStats instanceof FileBaseStatistics) ?
(FileBaseStatistics) cachedStats : null;
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/3561222c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
index 364c4d5..bff8c80 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
@@ -73,7 +73,7 @@ public class FlinkLocalCluster {
boolean submitBlocking = false;
if (conf != null) {
Object blockingFlag = conf.get(SUBMIT_BLOCKING);
- if (blockingFlag != null && blockingFlag instanceof Boolean) {
+ if (blockingFlag instanceof Boolean) {
submitBlocking = ((Boolean) blockingFlag).booleanValue();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3561222c/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
index d45a767..7301b91 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
@@ -169,7 +169,7 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T>
@Override
public SequentialStatistics getStatistics(BaseStatistics cachedStats) {
- final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
+ final FileBaseStatistics cachedFileStats = cachedStats instanceof FileBaseStatistics ?
(FileBaseStatistics) cachedStats : null;
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/3561222c/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 4d715e7..1d344b9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -345,7 +345,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
@Override
public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
- final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
+ final FileBaseStatistics cachedFileStats = cachedStats instanceof FileBaseStatistics ?
(FileBaseStatistics) cachedStats : null;
// store properties
http://git-wip-us.apache.org/repos/asf/flink/blob/3561222c/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 1c8e7ff..f43bd22 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -379,7 +379,7 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
@Override
public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
- final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
+ final FileBaseStatistics cachedFileStats = cachedStats instanceof FileBaseStatistics ?
(FileBaseStatistics) cachedStats : null;
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/3561222c/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
index f03a7ab..dfb9ea8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
@@ -114,7 +114,7 @@ public final class RuntimeSerializerFactory<T> implements TypeSerializerFactory<
@Override
public boolean equals(Object obj) {
- if (obj != null && obj instanceof RuntimeSerializerFactory) {
+ if (obj instanceof RuntimeSerializerFactory) {
RuntimeSerializerFactory<?> other = (RuntimeSerializerFactory<?>) obj;
return this.clazz == other.clazz &&
http://git-wip-us.apache.org/repos/asf/flink/blob/3561222c/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
index bef13fa..b53ac4b 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
@@ -104,7 +104,7 @@ public class FileInputSplit extends LocatableInputSplit {
if (obj == this) {
return true;
}
- else if (obj != null && obj instanceof FileInputSplit && super.equals(obj)) {
+ else if (obj instanceof FileInputSplit && super.equals(obj)) {
FileInputSplit other = (FileInputSplit) obj;
return this.start == other.start &&
http://git-wip-us.apache.org/repos/asf/flink/blob/3561222c/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
index 5c7bd00..fdc6406 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
@@ -67,7 +67,7 @@ public class GenericInputSplit implements InputSplit, java.io.Serializable {
@Override
public boolean equals(Object obj) {
- if (obj != null && obj instanceof GenericInputSplit) {
+ if (obj instanceof GenericInputSplit) {
GenericInputSplit other = (GenericInputSplit) obj;
return this.partitionNumber == other.partitionNumber &&
this.totalNumberOfPartitions == other.totalNumberOfPartitions;
http://git-wip-us.apache.org/repos/asf/flink/blob/3561222c/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
index 21a5093..b71521c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
@@ -90,7 +90,7 @@ public class LocatableInputSplit implements InputSplit, java.io.Serializable {
if (obj == this) {
return true;
}
- else if (obj != null && obj instanceof LocatableInputSplit) {
+ else if (obj instanceof LocatableInputSplit) {
LocatableInputSplit other = (LocatableInputSplit) obj;
return other.splitNumber == this.splitNumber && Arrays.deepEquals(other.hostnames, this.hostnames);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3561222c/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java
index 0a0f0f9..e485801 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java
@@ -331,7 +331,7 @@ public class UdfAnalyzer {
while (cause != null && !(cause instanceof CodeErrorException)) {
cause = cause.getCause();
}
- if ((cause != null && cause instanceof CodeErrorException) || e instanceof CodeErrorException) {
+ if (cause instanceof CodeErrorException || e instanceof CodeErrorException) {
throw new CodeErrorException("Function code contains obvious errors. " +
"If you think the code analysis is wrong at this point you can " +
"disable the entire code analyzer in ExecutionConfig or add" +
http://git-wip-us.apache.org/repos/asf/flink/blob/3561222c/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
index 8a76ed2..1c0af81 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
@@ -155,7 +155,7 @@ public final class UdfAnalyzerUtils {
}
public static boolean isTagged(Value value) {
- return value != null && value instanceof TaggedValue;
+ return value instanceof TaggedValue;
}
public static TaggedValue tagged(Value value) {
http://git-wip-us.apache.org/repos/asf/flink/blob/3561222c/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index 77945cc..579e761 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -70,7 +70,7 @@ public class CollectionInputFormatTest {
@Override
public boolean equals(Object obj) {
- if (obj != null && obj instanceof ElementType) {
+ if (obj instanceof ElementType) {
ElementType et = (ElementType) obj;
return et.getId() == this.getId();
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/3561222c/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
index 654b054..dffea8e 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
@@ -456,7 +456,7 @@ public class GlobalProperties implements Cloneable {
@Override
public boolean equals(Object obj) {
- if (obj != null && obj instanceof GlobalProperties) {
+ if (obj instanceof GlobalProperties) {
final GlobalProperties other = (GlobalProperties) obj;
return (this.partitioning == other.partitioning)
&& (this.ordering == other.ordering || (this.ordering != null && this.ordering.equals(other.ordering)))
http://git-wip-us.apache.org/repos/asf/flink/blob/3561222c/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java
index 6946641..1bd0e2f 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java
@@ -148,7 +148,7 @@ public class InterestingProperties implements Cloneable {
@Override
public boolean equals(Object obj) {
- if (obj != null && obj instanceof InterestingProperties) {
+ if (obj instanceof InterestingProperties) {
InterestingProperties other = (InterestingProperties) obj;
return this.globalProps.equals(other.globalProps) &&
this.localProps.equals(other.localProps);
http://git-wip-us.apache.org/repos/asf/flink/blob/3561222c/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java
index 3646d74..5adeb19 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java
@@ -448,7 +448,7 @@ public final class RequestedGlobalProperties implements Cloneable {
@Override
public boolean equals(Object obj) {
- if (obj != null && obj instanceof RequestedGlobalProperties) {
+ if (obj instanceof RequestedGlobalProperties) {
RequestedGlobalProperties other = (RequestedGlobalProperties) obj;
return (ordering == other.getOrdering() || (ordering != null && ordering.equals(other.getOrdering())))
&& (partitioning == other.getPartitioning())
http://git-wip-us.apache.org/repos/asf/flink/blob/3561222c/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index f7f9314..e3dcfad 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -810,8 +810,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
// cannot chain the nodes that produce the next workset or the next solution set, if they are not the
// in a tail
- if (this.currentIteration != null && this.currentIteration instanceof WorksetIterationPlanNode &&
- node.getOutgoingChannels().size() > 0)
+ if (this.currentIteration instanceof WorksetIterationPlanNode && node.getOutgoingChannels().size() > 0)
{
WorksetIterationPlanNode wspn = (WorksetIterationPlanNode) this.currentIteration;
if (wspn.getSolutionSetDeltaPlanNode() == pred || wspn.getNextWorkSetPlanNode() == pred) {
@@ -819,7 +818,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
}
}
// cannot chain the nodes that produce the next workset in a bulk iteration if a termination criterion follows
- if (this.currentIteration != null && this.currentIteration instanceof BulkIterationPlanNode)
+ if (this.currentIteration instanceof BulkIterationPlanNode)
{
BulkIterationPlanNode wspn = (BulkIterationPlanNode) this.currentIteration;
if (node == wspn.getRootOfTerminationCriterion() && wspn.getRootOfStepFunction() == pred){
http://git-wip-us.apache.org/repos/asf/flink/blob/3561222c/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java
index 58aa3c1..b3b7cf9 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java
@@ -147,7 +147,7 @@ public class PlanFinalizer implements Visitor<PlanNode> {
final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
// sanity check!
- if (iteration == null || !(iteration instanceof BulkIterationPlanNode)) {
+ if (!(iteration instanceof BulkIterationPlanNode)) {
throw new CompilerException("Bug: Error finalizing the plan. " +
"Cannot associate the node for a partial solutions with its containing iteration.");
}
@@ -159,7 +159,7 @@ public class PlanFinalizer implements Visitor<PlanNode> {
final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
// sanity check!
- if (iteration == null || !(iteration instanceof WorksetIterationPlanNode)) {
+ if (!(iteration instanceof WorksetIterationPlanNode)) {
throw new CompilerException("Bug: Error finalizing the plan. " +
"Cannot associate the node for a partial solutions with its containing iteration.");
}
@@ -171,7 +171,7 @@ public class PlanFinalizer implements Visitor<PlanNode> {
final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
// sanity check!
- if (iteration == null || !(iteration instanceof WorksetIterationPlanNode)) {
+ if (!(iteration instanceof WorksetIterationPlanNode)) {
throw new CompilerException("Bug: Error finalizing the plan. " +
"Cannot associate the node for a partial solutions with its containing iteration.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3561222c/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 3953072..986f3fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -545,7 +545,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
// we might also receive RegisterTaskManager and Heartbeat messages which
// are queued up in the testing actor's mailbox
- while(message == null || !(message instanceof Terminated)) {
+ while(!(message instanceof Terminated)) {
message = receiveOne(timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3561222c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
index 4111d91..201533c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
@@ -119,7 +119,7 @@ public class TimestampedFileInputSplit extends FileInputSplit implements Compara
public boolean equals(Object o) {
if (this == o) {
return true;
- } else if (o != null && o instanceof TimestampedFileInputSplit && super.equals(o)) {
+ } else if (o instanceof TimestampedFileInputSplit && super.equals(o)) {
TimestampedFileInputSplit that = (TimestampedFileInputSplit) o;
return this.modificationTime == that.modificationTime;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3561222c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index 370d021..291f892 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -303,7 +303,7 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
@Override
public boolean equals(Object o) {
- if (o == null || !(o instanceof GenericWriteAheadSink.PendingCheckpoint)) {
+ if (!(o instanceof GenericWriteAheadSink.PendingCheckpoint)) {
return false;
}
PendingCheckpoint other = (PendingCheckpoint) o;
[3/4] flink git commit: [FLINK-7967] [config] Deprecate Hadoop
specific Flink configuration options
Posted by gr...@apache.org.
[FLINK-7967] [config] Deprecate Hadoop specific Flink configuration options
This closes #4946
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fae83c0b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fae83c0b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fae83c0b
Branch: refs/heads/master
Commit: fae83c0b0c0a711d5d0f9de0b95fb6b703c4f912
Parents: 3561222
Author: zhangminglei <zm...@163.com>
Authored: Thu Nov 23 10:20:16 2017 +0800
Committer: Greg Hogan <co...@greghogan.com>
Committed: Mon Nov 27 22:30:43 2017 -0500
----------------------------------------------------------------------
docs/ops/config.md | 8 +++++---
.../org/apache/flink/configuration/ConfigConstants.java | 11 ++++++++++-
flink-dist/src/main/resources/flink-conf.yaml | 3 +++
3 files changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fae83c0b/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index ed65880..6f15d01 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -72,8 +72,6 @@ definition. As another example, if this is set to `hdfs://localhost:9000/`, then
without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going to be transformed into
`hdfs://localhost:9000/user/USERNAME/in.txt`. This scheme is used **ONLY** if no other scheme is specified (explicitly) in the user-provided `URI`.
-- `fs.hdfs.hadoopconf`: The absolute path to the Hadoop File System's (HDFS) configuration **directory** (OPTIONAL VALUE). Specifying this value allows programs to reference HDFS files using short URIs (`hdfs:///path/to/files`, without including the address and port of the NameNode in the file URI). Without this option, HDFS files can be accessed, but require fully qualified URIs like `hdfs://address:port/path/to/files`. This option also causes file writers to pick up the HDFS's default values for block sizes and replication factors. Flink will look for the "core-site.xml" and "hdfs-site.xml" files in the specified directory.
-
- `classloader.resolve-order`: Whether Flink should use a child-first `ClassLoader` when loading
user-code classes or a parent-first `ClassLoader`. Can be one of `parent-first` or `child-first`. (default: `child-first`)
@@ -246,9 +244,13 @@ Default value is the `akka.ask.timeout`.
### HDFS
+<div class="alert alert-warning">
+ <strong>Note:</strong> These keys are deprecated and it is recommended to configure the Hadoop path with the environment variable *HADOOP_CONF_DIR* instead.
+</div>
+
These parameters configure the default HDFS used by Flink. Setups that do not specify a HDFS configuration have to specify the full path to HDFS files (`hdfs://address:port/path/to/files`) Files will also be written with default HDFS parameters (block size, replication factor).
-- `fs.hdfs.hadoopconf`: The absolute path to the Hadoop configuration directory. The system will look for the "core-site.xml" and "hdfs-site.xml" files in that directory (DEFAULT: null).
+- `fs.hdfs.hadoopconf`: The absolute path to the Hadoop File System's (HDFS) configuration **directory** (OPTIONAL VALUE). Specifying this value allows programs to reference HDFS files using short URIs (`hdfs:///path/to/files`, without including the address and port of the NameNode in the file URI). Without this option, HDFS files can be accessed, but require fully qualified URIs like `hdfs://address:port/path/to/files`. This option also causes file writers to pick up the HDFS's default values for block sizes and replication factors. Flink will look for the "core-site.xml" and "hdfs-site.xml" files in the specified directory.
- `fs.hdfs.hdfsdefault`: The absolute path of Hadoop's own configuration file "hdfs-default.xml" (DEFAULT: null).
http://git-wip-us.apache.org/repos/asf/flink/blob/fae83c0b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index f80bd9b..5fd7085 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -595,19 +595,28 @@ public final class ConfigConstants {
/**
* Path to hdfs-defaul.xml file
+ *
+ * @deprecated Use environment variable HADOOP_CONF_DIR instead.
*/
+ @Deprecated
public static final String HDFS_DEFAULT_CONFIG = "fs.hdfs.hdfsdefault";
/**
* Path to hdfs-site.xml file
+ *
+ * @deprecated Use environment variable HADOOP_CONF_DIR instead.
*/
+ @Deprecated
public static final String HDFS_SITE_CONFIG = "fs.hdfs.hdfssite";
/**
* Path to Hadoop configuration
+ *
+ * @deprecated Use environment variable HADOOP_CONF_DIR instead.
*/
+ @Deprecated
public static final String PATH_HADOOP_CONFIG = "fs.hdfs.hadoopconf";
-
+
// ------------------------ File System Behavior ------------------------
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/fae83c0b/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index ddff9c8..fca97df 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -151,6 +151,9 @@ web.port: 8081
# Path to the Hadoop configuration directory.
#
+# Note: these keys are deprecated and it is recommended to configure the Hadoop
+# path with the environment variable 'HADOOP_CONF_DIR' instead.
+#
# This configuration is used when writing into HDFS. Unless specified otherwise,
# HDFS file creation will use HDFS default settings with respect to block-size,
# replication factor, etc.
[4/4] flink git commit: [hotfix] [tests] Use G1GC for tests
Posted by gr...@apache.org.
[hotfix] [tests] Use G1GC for tests
This closes #4748
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d08b7fe4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d08b7fe4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d08b7fe4
Branch: refs/heads/master
Commit: d08b7fe4fb7db3e68647b971e95051dece67c2be
Parents: fae83c0
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed Sep 27 18:38:42 2017 +0200
Committer: Greg Hogan <co...@greghogan.com>
Committed: Mon Nov 27 22:30:47 2017 -0500
----------------------------------------------------------------------
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d08b7fe4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f010b6c..2b74c96 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1109,7 +1109,7 @@ under the License.
<forkNumber>0${surefire.forkNumber}</forkNumber>
<log4j.configuration>${log4j.configuration}</log4j.configuration>
</systemPropertyVariables>
- <argLine>-Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber} -XX:+UseSerialGC</argLine>
+ <argLine>-Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber} -XX:+UseG1GC</argLine>
</configuration>
<executions>
<!--execute all the unit tests-->