You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/04 12:35:24 UTC
[1/7] flink git commit: [FLINK-6880] [runtime] Activate checkstyle
for runtime/iterative
Repository: flink
Updated Branches:
refs/heads/master 41806ba68 -> 85853edab
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/event/EventWithAggregatorsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/event/EventWithAggregatorsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/event/EventWithAggregatorsTest.java
index 10e0b11..341cff0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/event/EventWithAggregatorsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/event/EventWithAggregatorsTest.java
@@ -18,13 +18,6 @@
package org.apache.flink.runtime.iterative.event;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -35,54 +28,63 @@ import org.apache.flink.types.Value;
import org.junit.Assert;
import org.junit.Test;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+/**
+ * Tests for {@link IterationEventWithAggregators}.
+ */
public class EventWithAggregatorsTest {
-
+
private ClassLoader cl = ClassLoader.getSystemClassLoader();
-
+
@Test
public void testSerializationOfEmptyEvent() {
AllWorkersDoneEvent e = new AllWorkersDoneEvent();
IterationEventWithAggregators deserialized = pipeThroughSerialization(e);
-
+
Assert.assertEquals(0, deserialized.getAggregatorNames().length);
Assert.assertEquals(0, deserialized.getAggregates(cl).length);
}
-
+
@Test
public void testSerializationOfEventWithAggregateValues() {
StringValue stringValue = new StringValue("test string");
LongValue longValue = new LongValue(68743254);
-
+
String stringValueName = "stringValue";
String longValueName = "longValue";
-
+
Aggregator<StringValue> stringAgg = new TestAggregator<StringValue>(stringValue);
Aggregator<LongValue> longAgg = new TestAggregator<LongValue>(longValue);
-
+
Map<String, Aggregator<?>> aggMap = new HashMap<String, Aggregator<?>>();
aggMap.put(stringValueName, stringAgg);
aggMap.put(longValueName, longAgg);
-
+
Set<String> allNames = new HashSet<String>();
allNames.add(stringValueName);
allNames.add(longValueName);
-
+
Set<Value> allVals = new HashSet<Value>();
allVals.add(stringValue);
allVals.add(longValue);
-
+
// run the serialization
AllWorkersDoneEvent e = new AllWorkersDoneEvent(aggMap);
IterationEventWithAggregators deserialized = pipeThroughSerialization(e);
-
+
// verify the result
String[] names = deserialized.getAggregatorNames();
Value[] aggregates = deserialized.getAggregates(cl);
-
+
Assert.assertEquals(allNames.size(), names.length);
Assert.assertEquals(allVals.size(), aggregates.length);
-
+
// check that all the correct names and values are returned
for (String s : names) {
allNames.remove(s);
@@ -90,24 +92,24 @@ public class EventWithAggregatorsTest {
for (Value v : aggregates) {
allVals.remove(v);
}
-
+
Assert.assertTrue(allNames.isEmpty());
Assert.assertTrue(allVals.isEmpty());
}
-
+
private IterationEventWithAggregators pipeThroughSerialization(IterationEventWithAggregators event) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
event.write(new DataOutputViewStreamWrapper(baos));
-
+
byte[] data = baos.toByteArray();
baos.close();
-
+
DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(new ByteArrayInputStream(data));
IterationEventWithAggregators newEvent = event.getClass().newInstance();
newEvent.read(in);
in.close();
-
+
return newEvent;
}
catch (Exception e) {
@@ -117,14 +119,13 @@ public class EventWithAggregatorsTest {
return null;
}
}
-
+
private static class TestAggregator<T extends Value> implements Aggregator<T> {
private static final long serialVersionUID = 1L;
-
+
private final T val;
-
-
+
public TestAggregator(T val) {
this.val = val;
}
[4/7] flink git commit: [FLINK-6877] [runtime] Activate checkstyle
for runtime/security
Posted by ch...@apache.org.
[FLINK-6877] [runtime] Activate checkstyle for runtime/security
This closes #4095.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31ad8020
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31ad8020
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31ad8020
Branch: refs/heads/master
Commit: 31ad80206f1fee234848f1288d493ee9428d309e
Parents: 834c527
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Jun 9 09:58:24 2017 -0400
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 4 12:33:25 2017 +0200
----------------------------------------------------------------------
flink-runtime/pom.xml | 1 -
.../runtime/security/DynamicConfiguration.java | 18 +++++------
.../runtime/security/HadoopSecurityContext.java | 1 +
.../flink/runtime/security/KerberosUtils.java | 15 ++++-----
.../runtime/security/NoOpSecurityContext.java | 1 +
.../flink/runtime/security/SecurityContext.java | 1 +
.../flink/runtime/security/SecurityUtils.java | 32 +++++++++++---------
.../runtime/security/modules/HadoopModule.java | 9 ++++--
.../runtime/security/modules/JaasModule.java | 15 +++++----
.../security/modules/SecurityModule.java | 1 +
.../security/modules/ZooKeeperModule.java | 7 +++--
.../runtime/security/KerberosUtilsTest.java | 2 --
.../runtime/security/SecurityUtilsTest.java | 2 ++
13 files changed, 59 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index df905dc..80f95a5 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -455,7 +455,6 @@ under the License.
**/runtime/registration/**,
**/runtime/resourcemanager/**,
**/runtime/rpc/**,
- **/runtime/security/**,
**/runtime/state/**,
**/runtime/taskexecutor/**,
**/runtime/taskmanager/**,
http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
index 6af4f23..da1c9c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.security;
import org.slf4j.Logger;
@@ -23,6 +24,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
+
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -31,7 +33,7 @@ import java.util.Map;
/**
* A dynamic JAAS configuration.
*
- * Makes it possible to define Application Configuration Entries (ACEs) at runtime, building upon
+ * <p>Makes it possible to define Application Configuration Entries (ACEs) at runtime, building upon
* an (optional) underlying configuration. Entries from the underlying configuration take
* precedence over dynamic entries.
*/
@@ -41,7 +43,7 @@ public class DynamicConfiguration extends Configuration {
private final Configuration delegate;
- private final Map<String,AppConfigurationEntry[]> dynamicEntries = new HashMap<>();
+ private final Map<String, AppConfigurationEntry[]> dynamicEntries = new HashMap<>();
/**
* Create a dynamic configuration.
@@ -57,7 +59,7 @@ public class DynamicConfiguration extends Configuration {
public void addAppConfigurationEntry(String name, AppConfigurationEntry... entry) {
final AppConfigurationEntry[] existing = dynamicEntries.get(name);
final AppConfigurationEntry[] updated;
- if(existing == null) {
+ if (existing == null) {
updated = Arrays.copyOf(entry, entry.length);
}
else {
@@ -70,8 +72,6 @@ public class DynamicConfiguration extends Configuration {
* Retrieve the AppConfigurationEntries for the specified <i>name</i>
* from this Configuration.
*
- * <p>
- *
* @param name the name used to index the Configuration.
*
* @return an array of AppConfigurationEntries for the specified <i>name</i>
@@ -81,12 +81,12 @@ public class DynamicConfiguration extends Configuration {
@Override
public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
AppConfigurationEntry[] entry = null;
- if(delegate != null) {
+ if (delegate != null) {
entry = delegate.getAppConfigurationEntry(name);
}
final AppConfigurationEntry[] existing = dynamicEntries.get(name);
- if(existing != null) {
- if(entry != null) {
+ if (existing != null) {
+ if (entry != null) {
entry = merge(entry, existing);
}
else {
@@ -104,7 +104,7 @@ public class DynamicConfiguration extends Configuration {
@Override
public void refresh() {
- if(delegate != null) {
+ if (delegate != null) {
delegate.refresh();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
index c70f00b..7b0bb06 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.security;
import org.apache.flink.util.Preconditions;
+
import org.apache.hadoop.security.UserGroupInformation;
import java.security.PrivilegedExceptionAction;
http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
index 7ef9187..5662d29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
@@ -19,11 +19,13 @@
package org.apache.flink.runtime.security;
import org.apache.flink.annotation.Internal;
+
import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.security.auth.login.AppConfigurationEntry;
+
import java.io.File;
import java.util.HashMap;
import java.util.Map;
@@ -31,10 +33,9 @@ import java.util.Map;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- *
* Provides vendor-specific Kerberos {@link AppConfigurationEntry} instances.
*
- * The implementation is inspired from Hadoop UGI class.
+ * <p>The implementation is inspired from Hadoop UGI class.
*/
@Internal
public class KerberosUtils {
@@ -55,11 +56,11 @@ public class KerberosUtils {
IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM");
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
debugOptions.put("debug", "true");
}
- if(IBM_JAVA) {
+ if (IBM_JAVA) {
kerberosCacheOptions.put("useDefaultCcache", "true");
} else {
kerberosCacheOptions.put("doNotPrompt", "true");
@@ -67,8 +68,8 @@ public class KerberosUtils {
}
String ticketCache = System.getenv("KRB5CCNAME");
- if(ticketCache != null) {
- if(IBM_JAVA) {
+ if (ticketCache != null) {
+ if (IBM_JAVA) {
System.setProperty("KRB5CCNAME", ticketCache);
} else {
kerberosCacheOptions.put("ticketCache", ticketCache);
@@ -96,7 +97,7 @@ public class KerberosUtils {
Map<String, String> keytabKerberosOptions = new HashMap<>();
- if(IBM_JAVA) {
+ if (IBM_JAVA) {
keytabKerberosOptions.put("useKeytab", prependFileUri(keytab));
keytabKerberosOptions.put("credsType", "both");
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/main/java/org/apache/flink/runtime/security/NoOpSecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/NoOpSecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/NoOpSecurityContext.java
index 4574db5..02f97da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/NoOpSecurityContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/NoOpSecurityContext.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.security;
import java.util.concurrent.Callable;
http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
index 02892d3..63ca728 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.security;
import java.util.concurrent.Callable;
http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index b874009..df49822 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.security;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.configuration.Configuration;
@@ -29,6 +27,9 @@ import org.apache.flink.runtime.security.modules.HadoopModule;
import org.apache.flink.runtime.security.modules.JaasModule;
import org.apache.flink.runtime.security.modules.SecurityModule;
import org.apache.flink.runtime.security.modules.ZooKeeperModule;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,9 +42,8 @@ import java.util.List;
import static org.apache.flink.util.Preconditions.checkNotNull;
-/*
+/**
* Utils for configuring security. The following security subsystems are supported:
- *
* 1. Java Authentication and Authorization Service (JAAS)
* 2. Hadoop's User Group Information (UGI)
* 3. ZooKeeper's process-wide security settings.
@@ -56,7 +56,9 @@ public class SecurityUtils {
private static List<SecurityModule> installedModules = null;
- public static SecurityContext getInstalledContext() { return installedContext; }
+ public static SecurityContext getInstalledContext() {
+ return installedContext;
+ }
@VisibleForTesting
static List<SecurityModule> getInstalledModules() {
@@ -66,7 +68,7 @@ public class SecurityUtils {
/**
* Installs a process-wide security configuration.
*
- * Applies the configuration using the available security modules (i.e. Hadoop, JAAS).
+ * <p>Applies the configuration using the available security modules (i.e. Hadoop, JAAS).
*/
public static void install(SecurityConfiguration config) throws Exception {
@@ -79,7 +81,7 @@ public class SecurityUtils {
modules.add(module);
}
}
- catch(Exception ex) {
+ catch (Exception ex) {
throw new Exception("unable to establish the security context", ex);
}
installedModules = modules;
@@ -94,14 +96,14 @@ public class SecurityUtils {
}
static void uninstall() {
- if(installedModules != null) {
+ if (installedModules != null) {
for (SecurityModule module : Lists.reverse(installedModules)) {
try {
module.uninstall();
}
- catch(UnsupportedOperationException ignored) {
+ catch (UnsupportedOperationException ignored) {
}
- catch(SecurityModule.SecurityInstallException e) {
+ catch (SecurityModule.SecurityInstallException e) {
LOG.warn("unable to uninstall a security module", e);
}
}
@@ -114,7 +116,7 @@ public class SecurityUtils {
/**
* The global security configuration.
*
- * See {@link SecurityOptions} for corresponding configuration options.
+ * <p>See {@link SecurityOptions} for corresponding configuration options.
*/
public static class SecurityConfiguration {
@@ -215,22 +217,22 @@ public class SecurityUtils {
}
private void validate() {
- if(!StringUtils.isBlank(keytab)) {
+ if (!StringUtils.isBlank(keytab)) {
// principal is required
- if(StringUtils.isBlank(principal)) {
+ if (StringUtils.isBlank(principal)) {
throw new IllegalConfigurationException("Kerberos login configuration is invalid; keytab requires a principal.");
}
// check the keytab is readable
File keytabFile = new File(keytab);
- if(!keytabFile.exists() || !keytabFile.isFile() || !keytabFile.canRead()) {
+ if (!keytabFile.exists() || !keytabFile.isFile() || !keytabFile.canRead()) {
throw new IllegalConfigurationException("Kerberos login configuration is invalid; keytab is unreadable");
}
}
}
private static List<String> parseList(String value) {
- if(value == null || value.isEmpty()) {
+ if (value == null || value.isEmpty()) {
return Collections.emptyList();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
index 5c62272..05be314 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
@@ -15,11 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.security.modules;
-import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.runtime.security.SecurityUtils;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -29,6 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.security.auth.Subject;
+
import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -75,11 +78,11 @@ public class HadoopModule implements SecurityModule {
// and does not fallback to using Kerberos tickets
Method getAllTokensMethod = Credentials.class.getMethod("getAllTokens");
Credentials credentials = new Credentials();
- final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
+ final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN");
Collection<Token<? extends TokenIdentifier>> usrTok = (Collection<Token<? extends TokenIdentifier>>) getAllTokensMethod.invoke(cred);
//If UGI use keytab for login, do not load HDFS delegation token.
for (Token<? extends TokenIdentifier> token : usrTok) {
- if (!token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) {
+ if (!token.getKind().equals(hdfsDelegationTokenKind)) {
final Text id = new Text(token.getIdentifier());
credentials.addToken(id, token);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
index f8b9bdf..91411a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
@@ -15,16 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.security.modules;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.security.DynamicConfiguration;
import org.apache.flink.runtime.security.KerberosUtils;
import org.apache.flink.runtime.security.SecurityUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.security.auth.login.AppConfigurationEntry;
+
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -34,13 +37,13 @@ import java.nio.file.StandardCopyOption;
/**
* Responsible for installing a process-wide JAAS configuration.
- * <p>
- * The installed configuration combines login modules based on:
+ *
+ * <p>The installed configuration combines login modules based on:
* - the user-supplied JAAS configuration file, if any
* - a Kerberos keytab, if configured
* - any cached Kerberos credentials from the current environment
- * <p>
- * The module also installs a default JAAS config file (if necessary) for
+ *
+ * <p>The module also installs a default JAAS config file (if necessary) for
* compatibility with ZK and Kafka. Note that the JRE actually draws on numerous file locations.
* See: https://docs.oracle.com/javase/7/docs/jre/api/security/jaas/spec/com/sun/security/auth/login/ConfigFile.html
* See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
@@ -78,7 +81,7 @@ public class JaasModule implements SecurityModule {
// wire up the configured JAAS login contexts to use the krb5 entries
AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig);
- if(krb5Entries != null) {
+ if (krb5Entries != null) {
for (String app : securityConfig.getLoginContextNames()) {
currentConfig.addAppConfigurationEntry(app, krb5Entries);
}
@@ -89,7 +92,7 @@ public class JaasModule implements SecurityModule {
@Override
public void uninstall() throws SecurityInstallException {
- if(priorConfigFile != null) {
+ if (priorConfigFile != null) {
System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, priorConfigFile);
} else {
System.clearProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG);
http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
index fbe1db9..1a335df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.security.modules;
import org.apache.flink.runtime.security.SecurityUtils;
http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
index 216bdde..af2c1f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.security.modules;
import org.apache.flink.runtime.security.SecurityUtils;
@@ -66,17 +67,17 @@ public class ZooKeeperModule implements SecurityModule {
@Override
public void uninstall() throws SecurityInstallException {
- if(priorSaslEnable != null) {
+ if (priorSaslEnable != null) {
System.setProperty(ZK_ENABLE_CLIENT_SASL, priorSaslEnable);
} else {
System.clearProperty(ZK_ENABLE_CLIENT_SASL);
}
- if(priorServiceName != null) {
+ if (priorServiceName != null) {
System.setProperty(ZK_SASL_CLIENT_USERNAME, priorServiceName);
} else {
System.clearProperty(ZK_SASL_CLIENT_USERNAME);
}
- if(priorLoginContextName != null) {
+ if (priorLoginContextName != null) {
System.setProperty(ZK_LOGIN_CONTEXT_NAME, priorLoginContextName);
} else {
System.clearProperty(ZK_LOGIN_CONTEXT_NAME);
http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java
index 4c899e8..58817db 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java
@@ -18,12 +18,10 @@
package org.apache.flink.runtime.security;
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.junit.Test;
import javax.security.auth.login.AppConfigurationEntry;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
index 3e3808b..c179f6f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
@@ -15,11 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.security;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.security.modules.SecurityModule;
+
import org.junit.AfterClass;
import org.junit.Test;
[2/7] flink git commit: [FLINK-6880] [runtime] Activate checkstyle
for runtime/iterative
Posted by ch...@apache.org.
[FLINK-6880] [runtime] Activate checkstyle for runtime/iterative
This closes #4098.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/834c5277
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/834c5277
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/834c5277
Branch: refs/heads/master
Commit: 834c527783346f777bf5def4c61f1791b2a89473
Parents: 343a804
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Jun 9 12:53:41 2017 -0400
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 4 11:37:12 2017 +0200
----------------------------------------------------------------------
flink-runtime/pom.xml | 1 -
.../concurrent/BlockingBackChannel.java | 13 ++-
.../concurrent/BlockingBackChannelBroker.java | 8 +-
.../runtime/iterative/concurrent/Broker.java | 17 ++--
.../concurrent/IterationAggregatorBroker.java | 11 ++-
.../iterative/concurrent/SolutionSetBroker.java | 5 +-
.../SolutionSetUpdateBarrierBroker.java | 5 +-
.../iterative/concurrent/SuperstepBarrier.java | 22 +++--
.../concurrent/SuperstepKickoffLatch.java | 15 ++--
.../concurrent/SuperstepKickoffLatchBroker.java | 7 +-
.../WorksetEmptyConvergenceCriterion.java | 10 +--
.../iterative/event/AllWorkersDoneEvent.java | 11 ++-
.../event/IterationEventWithAggregators.java | 49 ++++++------
.../iterative/event/TerminationEvent.java | 9 +--
.../iterative/event/WorkerDoneEvent.java | 23 +++---
.../iterative/io/HashPartitionIterator.java | 12 +--
.../iterative/io/SerializedUpdateBuffer.java | 21 ++---
.../SolutionSetFastUpdateOutputCollector.java | 12 +--
...SolutionSetObjectsUpdateOutputCollector.java | 14 ++--
.../io/SolutionSetUpdateOutputCollector.java | 14 ++--
.../io/WorksetUpdateOutputCollector.java | 9 +--
.../iterative/task/AbstractIterativeTask.java | 56 ++++++-------
.../iterative/task/IterationHeadTask.java | 66 +++++++--------
.../task/IterationIntermediateTask.java | 23 +++---
.../task/IterationSynchronizationSinkTask.java | 52 ++++++------
.../iterative/task/IterationTailTask.java | 21 ++---
.../task/RuntimeAggregatorRegistry.java | 30 +++----
.../iterative/task/SyncEventHandler.java | 25 +++---
.../runtime/iterative/task/Terminable.java | 3 +-
.../concurrent/BlockingBackChannelTest.java | 21 ++---
.../iterative/concurrent/BrokerTest.java | 8 +-
.../iterative/concurrent/StringPair.java | 1 -
.../concurrent/SuperstepBarrierTest.java | 15 ++--
.../concurrent/SuperstepKickoffLatchTest.java | 84 ++++++++++----------
.../event/EventWithAggregatorsTest.java | 59 +++++++-------
35 files changed, 383 insertions(+), 369 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 654227a..df905dc 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -441,7 +441,6 @@ under the License.
**/runtime/highavailability/**,
**/runtime/instance/**,
**/runtime/io/**,
- **/runtime/iterative/**,
**/runtime/jobgraph/**,
**/runtime/jobmanager/**,
**/runtime/jobmaster/**,
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java
index 067bbfe..1373654 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java
@@ -16,27 +16,26 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.concurrent;
-import java.io.IOException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer;
+import java.io.IOException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
/**
* A concurrent datastructure that establishes a backchannel buffer between an iteration head
* and an iteration tail.
*/
public class BlockingBackChannel {
- /** buffer to send back the superstep results */
+ /** Buffer to send back the superstep results. */
private final SerializedUpdateBuffer buffer;
- /** a one element queue used for blocking hand over of the buffer */
+ /** A one element queue used for blocking hand over of the buffer. */
private final BlockingQueue<SerializedUpdateBuffer> queue;
public BlockingBackChannel(SerializedUpdateBuffer buffer) {
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelBroker.java
index daa3ec3..120ecf1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelBroker.java
@@ -16,23 +16,19 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.concurrent;
/**
- * Singleton class for the threadsafe handover of {@link BlockingBackChannel}s from iteration heads to iteration tails
+ * Singleton class for the threadsafe handover of {@link BlockingBackChannel}s from iteration heads to iteration tails.
*/
public class BlockingBackChannelBroker extends Broker<BlockingBackChannel> {
- /**
- * Singleton instance
- */
private static final BlockingBackChannelBroker INSTANCE = new BlockingBackChannelBroker();
private BlockingBackChannelBroker() {}
/**
- * retrieve singleton instance
+ * Retrieve singleton instance.
*/
public static Broker<BlockingBackChannel> instance() {
return INSTANCE;
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/Broker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/Broker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/Broker.java
index 444d21f..6816af4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/Broker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/Broker.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.concurrent;
import java.util.concurrent.ArrayBlockingQueue;
@@ -25,14 +24,14 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
- * A concurrent data structure that allows the hand-over of an object between a pair of threads
+ * A concurrent data structure that allows the hand-over of an object between a pair of threads.
*/
public class Broker<V> {
private final ConcurrentMap<String, BlockingQueue<V>> mediations = new ConcurrentHashMap<String, BlockingQueue<V>>();
/**
- * hand in the object to share
+ * Hand in the object to share.
*/
public void handIn(String key, V obj) {
if (!retrieveSharedQueue(key).offer(obj)) {
@@ -40,7 +39,7 @@ public class Broker<V> {
}
}
- /** blocking retrieval and removal of the object to share */
+ /** Blocking retrieval and removal of the object to share. */
public V getAndRemove(String key) {
try {
V objToShare = retrieveSharedQueue(key).take();
@@ -50,13 +49,13 @@ public class Broker<V> {
throw new RuntimeException(e);
}
}
-
- /** blocking retrieval and removal of the object to share */
+
+ /** Blocking retrieval and removal of the object to share. */
public void remove(String key) {
mediations.remove(key);
}
-
- /** blocking retrieval and removal of the object to share */
+
+ /** Blocking retrieval and removal of the object to share. */
public V get(String key) {
try {
BlockingQueue<V> queue = retrieveSharedQueue(key);
@@ -71,7 +70,7 @@ public class Broker<V> {
}
/**
- * thread-safe call to get a shared {@link BlockingQueue}
+ * Thread-safe call to get a shared {@link BlockingQueue}.
*/
private BlockingQueue<V> retrieveSharedQueue(String key) {
BlockingQueue<V> queue = mediations.get(key);
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/IterationAggregatorBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/IterationAggregatorBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/IterationAggregatorBroker.java
index bf509f6..249a492 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/IterationAggregatorBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/IterationAggregatorBroker.java
@@ -16,17 +16,20 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.concurrent;
import org.apache.flink.runtime.iterative.task.RuntimeAggregatorRegistry;
+/**
+ * {@link Broker} for {@link RuntimeAggregatorRegistry}.
+ */
public class IterationAggregatorBroker extends Broker<RuntimeAggregatorRegistry> {
-
- /** single instance */
+
private static final IterationAggregatorBroker INSTANCE = new IterationAggregatorBroker();
- /** retrieve singleton instance */
+ /**
+ * Retrieve singleton instance.
+ */
public static IterationAggregatorBroker instance() {
return INSTANCE;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java
index 3d9d0ea..f9ba0c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java
@@ -23,9 +23,6 @@ package org.apache.flink.runtime.iterative.concurrent;
*/
public class SolutionSetBroker extends Broker<Object> {
- /**
- * Singleton instance
- */
private static final SolutionSetBroker INSTANCE = new SolutionSetBroker();
/**
@@ -34,6 +31,6 @@ public class SolutionSetBroker extends Broker<Object> {
public static Broker<Object> instance() {
return INSTANCE;
}
-
+
private SolutionSetBroker() {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
index 352a262..9a1e40a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
@@ -22,15 +22,12 @@ import org.apache.flink.runtime.iterative.task.IterationHeadTask;
import org.apache.flink.runtime.iterative.task.IterationTailTask;
/**
- * Broker to hand over {@link SolutionSetUpdateBarrier} from
+ * Broker to hand over {@link SolutionSetUpdateBarrier} from
* {@link IterationHeadTask} to
* {@link IterationTailTask}.
*/
public class SolutionSetUpdateBarrierBroker extends Broker<SolutionSetUpdateBarrier> {
- /**
- * Singleton instance
- */
private static final SolutionSetUpdateBarrierBroker INSTANCE = new SolutionSetUpdateBarrierBroker();
private SolutionSetUpdateBarrierBroker() {
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
index cc5d3c5..54bb870 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
@@ -16,22 +16,21 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.concurrent;
-import java.util.concurrent.CountDownLatch;
-
import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
import org.apache.flink.runtime.iterative.event.TerminationEvent;
+import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.types.Value;
+import java.util.concurrent.CountDownLatch;
+
/**
* A resettable one-shot latch.
*/
public class SuperstepBarrier implements EventListener<TaskEvent> {
-
+
private final ClassLoader userCodeClassLoader;
private boolean terminationSignaled = false;
@@ -40,19 +39,17 @@ public class SuperstepBarrier implements EventListener<TaskEvent> {
private String[] aggregatorNames;
private Value[] aggregates;
-
-
+
public SuperstepBarrier(ClassLoader userCodeClassLoader) {
this.userCodeClassLoader = userCodeClassLoader;
}
-
- /** setup the barrier, has to be called at the beginning of each superstep */
+ /** Setup the barrier, has to be called at the beginning of each superstep. */
public void setup() {
latch = new CountDownLatch(1);
}
- /** wait on the barrier */
+ /** Wait on the barrier. */
public void waitForOtherWorkers() throws InterruptedException {
latch.await();
}
@@ -60,13 +57,12 @@ public class SuperstepBarrier implements EventListener<TaskEvent> {
public String[] getAggregatorNames() {
return aggregatorNames;
}
-
+
public Value[] getAggregates() {
return aggregates;
}
- /** barrier will release the waiting thread if an event occurs
- * @param event*/
+ /** Barrier will release the waiting thread if an event occurs. */
@Override
public void onEvent(TaskEvent event) {
if (event instanceof TerminationEvent) {
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
index 83b7a4a..b86ade2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
@@ -18,14 +18,17 @@
package org.apache.flink.runtime.iterative.concurrent;
+/**
+ * Latch used to wait for the previous superstep to complete.
+ */
public class SuperstepKickoffLatch {
-
+
private final Object monitor = new Object();
-
+
private int superstepNumber = 1;
-
+
private boolean terminated;
-
+
public void triggerNextSuperstep() {
synchronized (monitor) {
if (terminated) {
@@ -35,14 +38,14 @@ public class SuperstepKickoffLatch {
monitor.notifyAll();
}
}
-
+
public void signalTermination() {
synchronized (monitor) {
terminated = true;
monitor.notifyAll();
}
}
-
+
public boolean awaitStartOfSuperstepOrTermination(int superstep) throws InterruptedException {
while (true) {
synchronized (monitor) {
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
index f137680..3b545c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
@@ -18,13 +18,18 @@
package org.apache.flink.runtime.iterative.concurrent;
+/**
+ * {@link Broker} for {@link SuperstepKickoffLatch}.
+ */
public class SuperstepKickoffLatchBroker extends Broker<SuperstepKickoffLatch> {
private static final SuperstepKickoffLatchBroker INSTANCE = new SuperstepKickoffLatchBroker();
private SuperstepKickoffLatchBroker() {}
-
+ /**
+ * Retrieve the singleton instance.
+ */
public static Broker<SuperstepKickoffLatch> instance() {
return INSTANCE;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
index 2987b89..19c26bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
@@ -16,23 +16,23 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.convergence;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.types.LongValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
- * A workset iteration is by definition converged if no records have been updated in the solutionset
+ * A workset iteration is by definition converged if no records have been updated in the solutionset.
*/
public class WorksetEmptyConvergenceCriterion implements ConvergenceCriterion<LongValue> {
private static final long serialVersionUID = 1L;
private static final Logger log = LoggerFactory.getLogger(WorksetEmptyConvergenceCriterion.class);
-
+
public static final String AGGREGATOR_NAME = "pact.runtime.workset-empty-aggregator";
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/AllWorkersDoneEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/AllWorkersDoneEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/AllWorkersDoneEvent.java
index 2fd0db1..e62bf78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/AllWorkersDoneEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/AllWorkersDoneEvent.java
@@ -16,19 +16,22 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.event;
-import java.util.Map;
-
import org.apache.flink.api.common.aggregators.Aggregator;
+import java.util.Map;
+
+/**
+ * Event sent by the {@code IterationSynchronizationSinkTask} to each
+ * {@code IterationHead} signaling to start a new superstep.
+ */
public class AllWorkersDoneEvent extends IterationEventWithAggregators {
public AllWorkersDoneEvent() {
super();
}
-
+
public AllWorkersDoneEvent(Map<String, Aggregator<?>> aggregators) {
super(aggregators);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
index 4e1c19e..06b7687 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
@@ -18,11 +18,6 @@
package org.apache.flink.runtime.iterative.event;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Map;
-
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -32,13 +27,21 @@ import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.types.Value;
import org.apache.flink.util.InstantiationUtil;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Base class for iteration {@link TaskEvent} transmitting operator aggregators.
+ */
public abstract class IterationEventWithAggregators extends TaskEvent {
-
+
protected static final String[] NO_STRINGS = new String[0];
protected static final Value[] NO_VALUES = new Value[0];
-
+
private String[] aggNames;
-
+
private String[] classNames;
private byte[][] serializedData;
@@ -53,11 +56,11 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
if (aggregatorName == null || aggregate == null) {
throw new NullPointerException();
}
-
+
this.aggNames = new String[] { aggregatorName };
this.aggregates = new Value[] { aggregate };
}
-
+
protected IterationEventWithAggregators(Map<String, Aggregator<?>> aggregators) {
int num = aggregators.size();
if (num == 0) {
@@ -66,7 +69,7 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
} else {
this.aggNames = new String[num];
this.aggregates = new Value[num];
-
+
int i = 0;
for (Map.Entry<String, Aggregator<?>> entry : aggregators.entrySet()) {
this.aggNames[i] = entry.getKey();
@@ -75,7 +78,7 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
}
}
}
-
+
public String[] getAggregatorNames() {
return this.aggNames;
}
@@ -97,21 +100,19 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
catch (ClassCastException e) {
throw new RuntimeException("User-defined aggregator class is not a value sublass.");
}
-
-
+
try (DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(
- new ByteArrayInputStream(serializedData[i])))
- {
+ new ByteArrayInputStream(serializedData[i]))) {
v.read(in);
}
catch (IOException e) {
throw new RuntimeException("Error while deserializing the user-defined aggregate class.", e);
}
-
+
aggregates[i] = v;
}
}
-
+
return this.aggregates;
}
@@ -119,15 +120,15 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
public void write(DataOutputView out) throws IOException {
int num = this.aggNames.length;
out.writeInt(num);
-
+
ByteArrayOutputStream boas = new ByteArrayOutputStream();
DataOutputViewStreamWrapper bufferStream = new DataOutputViewStreamWrapper(boas);
-
+
for (int i = 0; i < num; i++) {
// aggregator name and type
out.writeUTF(this.aggNames[i]);
out.writeUTF(this.aggregates[i].getClass().getName());
-
+
// aggregator value indirect as a byte array
this.aggregates[i].write(bufferStream);
bufferStream.flush();
@@ -160,14 +161,14 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
for (int i = 0; i < num; i++) {
this.aggNames[i] = in.readUTF();
this.classNames[i] = in.readUTF();
-
+
int len = in.readInt();
byte[] data = new byte[len];
this.serializedData[i] = data;
in.readFully(data);
-
+
}
-
+
this.aggregates = null;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
index 28181e8..f1523df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
@@ -16,22 +16,21 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.event;
-import java.io.IOException;
-
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.event.TaskEvent;
+import java.io.IOException;
+
/**
- * Signals that the iteration is completely executed, participating tasks must terminate now
+ * Signals that the iteration is completely executed, participating tasks must terminate now.
*/
public class TerminationEvent extends TaskEvent {
public static final TerminationEvent INSTANCE = new TerminationEvent();
-
+
@Override
public void write(DataOutputView out) throws IOException {}
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/WorkerDoneEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/WorkerDoneEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/WorkerDoneEvent.java
index 2e348e9..ce24f23 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/WorkerDoneEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/WorkerDoneEvent.java
@@ -16,21 +16,24 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.event;
-import java.io.IOException;
-import java.util.Map;
-
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.Value;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Completion event sent from each {@code IterationHead} to the
+ * {@code IterationSynchronizationSinkTask}.
+ */
public class WorkerDoneEvent extends IterationEventWithAggregators {
-
+
private int workerIndex;
-
+
public WorkerDoneEvent() {
super();
}
@@ -39,22 +42,22 @@ public class WorkerDoneEvent extends IterationEventWithAggregators {
super(aggregatorName, aggregate);
this.workerIndex = workerIndex;
}
-
+
public WorkerDoneEvent(int workerIndex, Map<String, Aggregator<?>> aggregators) {
super(aggregators);
this.workerIndex = workerIndex;
}
-
+
public int getWorkerIndex() {
return workerIndex;
}
-
+
@Override
public void write(DataOutputView out) throws IOException {
out.writeInt(this.workerIndex);
super.write(out);
}
-
+
@Override
public void read(DataInputView in) throws IOException {
this.workerIndex = in.readInt();
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
index 93ae55f..5bd3131 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
@@ -18,17 +18,17 @@
package org.apache.flink.runtime.iterative.io;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.Iterator;
-
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.operators.hash.HashPartition;
import org.apache.flink.util.MutableObjectIterator;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Iterator;
+
/**
- * {@link Iterator} over the build side entries of a {@link HashPartition}
- *
+ * {@link Iterator} over the build side entries of a {@link HashPartition}.
+ *
* @param <BT>
*/
public class HashPartitionIterator<BT, PT> implements MutableObjectIterator<BT> {
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
index 7776894..353dcca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
@@ -16,9 +16,16 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.io;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.AbstractPagedInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayDeque;
@@ -28,14 +35,10 @@ import java.util.Deque;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
-
+/**
+ * {@link AbstractPagedOutputView} used by the {@code BlockingBackChannel} for
+ * transmitting superstep results.
+ */
public class SerializedUpdateBuffer extends AbstractPagedOutputView {
private static final int HEADER_LENGTH = 4;
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
index f326d89..10962d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
@@ -18,17 +18,17 @@
package org.apache.flink.runtime.iterative.io;
-import java.io.IOException;
-
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
import org.apache.flink.util.Collector;
+import java.io.IOException;
+
/**
* A {@link Collector} to update the solution set of a workset iteration.
- * <p>
- * The records are written to a HashTable hash table to allow in-memory point updates.
- * <p>
- * Assumption for fast updates: the build side iterator of the hash table is already positioned for the update. This
+ *
+ * <p>The records are written to a hash table to allow in-memory point updates.
+ *
+ * <p>Assumption for fast updates: the build side iterator of the hash table is already positioned for the update. This
* is for example the case when a solution set update happens directly after a solution set join. If this assumption
* doesn't hold, use {@link SolutionSetUpdateOutputCollector}, which probes the hash table before updating.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java
index 21a9cc8..6a57dfc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java
@@ -24,12 +24,12 @@ import org.apache.flink.util.Collector;
/**
* A {@link Collector} to update the solution set of a workset iteration.
- * <p>
- * The records are written to a HashTable hash table to allow in-memory point updates.
- * <p>
- * Records will only be collected, if there is a match after probing the hash table. If the build side iterator is
+ *
+ * <p>The records are written to a HashTable hash table to allow in-memory point updates.
+ *
+ * <p>Records will only be collected, if there is a match after probing the hash table. If the build side iterator is
* already positioned for the update, use {@link SolutionSetFastUpdateOutputCollector} to the save re-probing.
- *
+ *
* @see SolutionSetFastUpdateOutputCollector
*/
public class SolutionSetObjectsUpdateOutputCollector<T> implements Collector<T> {
@@ -37,9 +37,9 @@ public class SolutionSetObjectsUpdateOutputCollector<T> implements Collector<T>
private final Collector<T> delegate;
private final JoinHashMap<T> hashMap;
-
+
private final TypeSerializer<T> serializer;
-
+
public SolutionSetObjectsUpdateOutputCollector(JoinHashMap<T> hashMap) {
this(hashMap, null);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
index c39efa5..90041c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
@@ -18,19 +18,19 @@
package org.apache.flink.runtime.iterative.io;
-import java.io.IOException;
-
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
import org.apache.flink.util.Collector;
+import java.io.IOException;
+
/**
* A {@link Collector} to update the solution set of a workset iteration.
- * <p>
- * The records are written to a HashTable hash table to allow in-memory point updates.
- * <p>
- * Records will only be collected, if there is a match after probing the hash table. If the build side iterator is
+ *
+ * <p>The records are written to a HashTable hash table to allow in-memory point updates.
+ *
+ * <p>Records will only be collected, if there is a match after probing the hash table. If the build side iterator is
* already positioned for the update, use {@link SolutionSetFastUpdateOutputCollector} to the save re-probing.
- *
+ *
* @see SolutionSetFastUpdateOutputCollector
*/
public class SolutionSetUpdateOutputCollector<T> implements Collector<T> {
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/WorksetUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/WorksetUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/WorksetUpdateOutputCollector.java
index ad1d274..329e768 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/WorksetUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/WorksetUpdateOutputCollector.java
@@ -16,19 +16,18 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.io;
-import java.io.IOException;
-
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Collector;
+import java.io.IOException;
+
/**
* A {@link Collector} to update the iteration workset (partial solution for bulk iterations).
- * <p>
- * The records are written to a {@link DataOutputView} to allow in-memory data exchange.
+ *
+ * <p>The records are written to a {@link DataOutputView} to allow in-memory data exchange.
*/
public class WorksetUpdateOutputCollector<T> implements Collector<T> {
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
index 047fd7e..bde358c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
@@ -21,11 +21,6 @@ package org.apache.flink.runtime.iterative.task;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.operators.BatchTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.common.functions.Function;
@@ -33,7 +28,9 @@ import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.operators.util.JoinHashMap;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.MutableReader;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
@@ -45,6 +42,7 @@ import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCri
import org.apache.flink.runtime.iterative.io.SolutionSetObjectsUpdateOutputCollector;
import org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollector;
import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
+import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.ResettableDriver;
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
@@ -55,6 +53,9 @@ import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
@@ -64,10 +65,9 @@ import java.util.concurrent.Future;
* The abstract base class for all tasks able to participate in an iteration.
*/
public abstract class AbstractIterativeTask<S extends Function, OT> extends BatchTask<S, OT>
- implements Terminable
-{
+ implements Terminable {
private static final Logger log = LoggerFactory.getLogger(AbstractIterativeTask.class);
-
+
protected LongSumAggregator worksetAggregator;
protected BlockingBackChannel worksetBackChannel;
@@ -77,14 +77,13 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
protected boolean isWorksetUpdate;
protected boolean isSolutionSetUpdate;
-
private RuntimeAggregatorRegistry iterationAggregators;
private String brokerKey;
private int superstepNum = 1;
-
+
private volatile boolean terminationRequested;
// --------------------------------------------------------------------------------------------
@@ -105,7 +104,7 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
}
}
}
-
+
TaskConfig config = getLastTasksConfig();
isWorksetIteration = config.getIsWorksetIteration();
isWorksetUpdate = config.getIsWorksetUpdate();
@@ -134,7 +133,7 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
} else {
reinstantiateDriver();
resetAllInputs();
-
+
// re-read the iterative broadcast variables
for (int i : this.iterativeBroadcastInputs) {
final String name = getTaskConfig().getBroadcastInputName(i);
@@ -144,7 +143,7 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
// call the parent to execute the superstep
super.run();
-
+
// release the iterative broadcast variables
for (int i : this.iterativeBroadcastInputs) {
final String name = getTaskConfig().getBroadcastInputName(i);
@@ -244,8 +243,9 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
@SuppressWarnings("unchecked")
MutableObjectIterator<Object> inIter = (MutableObjectIterator<Object>) this.inputIterators[inputNum];
Object o = this.inputSerializers[inputNum].getSerializer().createInstance();
- while ((o = inIter.next(o)) != null);
-
+ while ((o = inIter.next(o)) != null) {
+ }
+
if (!reader.isFinished()) {
// also reset the end-of-superstep state
reader.startNextSuperstep();
@@ -253,17 +253,17 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
}
}
}
-
+
for (int inputNum : this.iterativeBroadcastInputs) {
MutableReader<?> reader = this.broadcastInputReaders[inputNum];
if (!reader.isFinished()) {
-
+
// sanity check that the BC input is at the end of the superstep
if (!reader.hasReachedEndOfSuperstep()) {
throw new IllegalStateException("An iterative broadcast input has not been fully consumed.");
}
-
+
reader.startNextSuperstep();
}
}
@@ -291,11 +291,11 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
/**
* Creates a new {@link WorksetUpdateOutputCollector}.
- * <p>
- * This collector is used by {@link IterationIntermediateTask} or {@link IterationTailTask} to update the
+ *
+ * <p>This collector is used by {@link IterationIntermediateTask} or {@link IterationTailTask} to update the
* workset.
- * <p>
- * If a non-null delegate is given, the new {@link Collector} will write to the solution set and also call
+ *
+ * <p>If a non-null delegate is given, the new {@link Collector} will write to the solution set and also call
* collect(T) of the delegate.
*
* @param delegate null -OR- the delegate on which to call collect() by the newly created collector
@@ -313,13 +313,13 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
/**
* Creates a new solution set update output collector.
- * <p>
- * This collector is used by {@link IterationIntermediateTask} or {@link IterationTailTask} to update the
+ *
+ * <p>This collector is used by {@link IterationIntermediateTask} or {@link IterationTailTask} to update the
* solution set of workset iterations. Depending on the task configuration, either a fast (non-probing)
* {@link org.apache.flink.runtime.iterative.io.SolutionSetFastUpdateOutputCollector} or normal (re-probing)
* {@link SolutionSetUpdateOutputCollector} is created.
- * <p>
- * If a non-null delegate is given, the new {@link Collector} will write back to the solution set and also call
+ *
+ * <p>If a non-null delegate is given, the new {@link Collector} will write back to the solution set and also call
* collect(T) of the delegate.
*
* @param delegate null -OR- a delegate collector to be called by the newly created collector
@@ -328,7 +328,7 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
*/
protected Collector<OT> createSolutionSetUpdateOutputCollector(Collector<OT> delegate) {
Broker<Object> solutionSetBroker = SolutionSetBroker.instance();
-
+
Object ss = solutionSetBroker.get(brokerKey());
if (ss instanceof CompactingHashTable) {
@SuppressWarnings("unchecked")
@@ -363,7 +363,7 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
private class IterativeRuntimeUdfContext extends DistributedRuntimeUDFContext implements IterationRuntimeContext {
public IterativeRuntimeUdfContext(TaskInfo taskInfo, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
- Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulatorMap, MetricGroup metrics) {
+ Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?, ?>> accumulatorMap, MetricGroup metrics) {
super(taskInfo, userCodeClassLoader, executionConfig, cpTasks, accumulatorMap, metrics);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
index 575072d..b673ba0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
@@ -18,18 +18,6 @@
package org.apache.flink.runtime.iterative.task;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.operators.Driver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.operators.util.JoinHashMap;
import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -39,6 +27,10 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
import org.apache.flink.runtime.iterative.concurrent.Broker;
@@ -54,12 +46,20 @@ import org.apache.flink.runtime.iterative.event.TerminationEvent;
import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer;
import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* The head is responsible for coordinating an iteration and can run a
* {@link Driver} inside. It will read
@@ -71,11 +71,11 @@ import org.apache.flink.util.MutableObjectIterator;
* the second iteration, the input for the head is the output of the tail, transmitted through the backchannel. Once the
* iteration is done, the head
* will send a {@link TerminationEvent} to all it's connected tasks, signaling them to shutdown.
- * <p>
- * Assumption on the ordering of the outputs: - The first n output gates write to channels that go to the tasks of the
+ *
+ * <p>Assumption on the ordering of the outputs: - The first n output gates write to channels that go to the tasks of the
* step function. - The next m output gates to to the tasks that consume the final solution. - The last output gate
* connects to the synchronization task.
- *
+ *
* @param <X>
* The type of the bulk partial solution / solution set and the final output.
* @param <Y>
@@ -131,8 +131,8 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
}
/**
- * the iteration head prepares the backchannel: it allocates memory, instantiates a {@link BlockingBackChannel} and
- * hands it to the iteration tail via a {@link Broker} singleton
+ * The iteration head prepares the backchannel: it allocates memory, instantiates a {@link BlockingBackChannel} and
+ * hands it to the iteration tail via a {@link Broker} singleton.
**/
private BlockingBackChannel initBackChannel() throws Exception {
@@ -154,7 +154,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
return backChannel;
}
-
+
private <BT> CompactingHashTable<BT> initCompactingHashTable() throws Exception {
// get some memory
double hashjoinMemorySize = config.getRelativeSolutionSetMemory();
@@ -162,7 +162,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
TypeSerializerFactory<BT> solutionTypeSerializerFactory = config.getSolutionSetSerializer(userCodeClassLoader);
TypeComparatorFactory<BT> solutionTypeComparatorFactory = config.getSolutionSetComparator(userCodeClassLoader);
-
+
TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer();
TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator();
@@ -194,27 +194,27 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
}
}
}
-
+
private <BT> JoinHashMap<BT> initJoinHashMap() {
TypeSerializerFactory<BT> solutionTypeSerializerFactory = config.getSolutionSetSerializer
(getUserCodeClassLoader());
TypeComparatorFactory<BT> solutionTypeComparatorFactory = config.getSolutionSetComparator
(getUserCodeClassLoader());
-
+
TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer();
TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator();
-
+
return new JoinHashMap<BT>(solutionTypeSerializer, solutionTypeComparator);
}
-
+
private void readInitialSolutionSet(CompactingHashTable<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
solutionSet.open();
solutionSet.buildTableWithUniqueKey(solutionSetInput);
}
-
+
private void readInitialSolutionSet(JoinHashMap<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
TypeSerializer<X> serializer = solutionTypeSerializer.getSerializer();
-
+
X next;
while ((next = solutionSetInput.next(serializer.createInstance())) != null) {
solutionSet.insertOrReplace(next);
@@ -232,12 +232,12 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
public void run() throws Exception {
final String brokerKey = brokerKey();
final int workerIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask();
-
+
final boolean objectSolutionSet = config.isSolutionSetUnmanaged();
CompactingHashTable<X> solutionSet = null; // if workset iteration
JoinHashMap<X> solutionSetObjectMap = null; // if workset iteration with unmanaged solution set
-
+
boolean waitForSolutionSetUpdate = config.getWaitForSolutionSetUpdate();
boolean isWorksetIteration = config.getIsWorksetIteration();
@@ -245,7 +245,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
/* used for receiving the current iteration result from iteration tail */
SuperstepKickoffLatch nextStepKickoff = new SuperstepKickoffLatch();
SuperstepKickoffLatchBroker.instance().handIn(brokerKey, nextStepKickoff);
-
+
BlockingBackChannel backChannel = initBackChannel();
SuperstepBarrier barrier = initSuperstepBarrier();
SolutionSetUpdateBarrier solutionSetUpdateBarrier = null;
@@ -262,7 +262,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
// setup the index for the solution set
@SuppressWarnings("unchecked")
MutableObjectIterator<X> solutionSetInput = (MutableObjectIterator<X>) createInputIterator(inputReaders[initialSolutionSetInput], solutionTypeSerializer);
-
+
// read the initial solution set
if (objectSolutionSet) {
solutionSetObjectMap = initJoinHashMap();
@@ -284,7 +284,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
@SuppressWarnings("unchecked")
TypeSerializerFactory<X> solSer = (TypeSerializerFactory<X>) feedbackTypeSerializer;
solutionTypeSerializer = solSer;
-
+
// = termination Criterion tail
if (waitForSolutionSetUpdate) {
solutionSetUpdateBarrier = new SolutionSetUpdateBarrier();
@@ -352,7 +352,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
String[] globalAggregateNames = barrier.getAggregatorNames();
Value[] globalAggregates = barrier.getAggregates();
aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, globalAggregates);
-
+
nextStepKickoff.triggerNextSuperstep();
}
}
@@ -398,7 +398,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
out.collect(record);
}
}
-
+
private void streamSolutionSetToFinalOutput(CompactingHashTable<X> hashTable) throws IOException {
final MutableObjectIterator<X> results = hashTable.getEntryIterator();
final Collector<X> output = this.finalOutputCollector;
@@ -408,7 +408,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
output.collect(record);
}
}
-
+
@SuppressWarnings("unchecked")
private void streamSolutionSetToFinalOutput(JoinHashMap<X> soluionSet) throws IOException {
final Collector<X> output = this.finalOutputCollector;
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
index 16a7008..c5fd133 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
@@ -19,14 +19,15 @@
package org.apache.flink.runtime.iterative.task;
import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
import org.apache.flink.runtime.iterative.event.TerminationEvent;
import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
import org.apache.flink.util.Collector;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,11 +35,11 @@ import java.io.IOException;
/**
* An intermediate iteration task, which runs a {@link org.apache.flink.runtime.operators.Driver} inside.
- * <p>
- * It will propagate {@link EndOfSuperstepEvent}s and {@link TerminationEvent}s to its connected tasks. Furthermore
+ *
+ * <p>It will propagate {@link EndOfSuperstepEvent}s and {@link TerminationEvent}s to its connected tasks. Furthermore
* intermediate tasks can also update the iteration state, either the workset or the solution set.
- * <p>
- * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadTask} via
+ *
+ * <p>If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadTask} via
* a {@link BlockingBackChannel} for the workset -XOR- a HashTable for the solution set. In this case
* this task must be scheduled on the same instance as the head.
*/
@@ -64,7 +65,7 @@ public class IterationIntermediateTask<S extends Function, OT> extends AbstractI
if (isSolutionSetUpdate) {
throw new IllegalStateException("Plan bug: Intermediate task performs workset and solutions set update.");
}
-
+
Collector<OT> outputCollector = createWorksetUpdateOutputCollector(delegate);
// we need the WorksetUpdateOutputCollector separately to count the collected elements
@@ -80,7 +81,7 @@ public class IterationIntermediateTask<S extends Function, OT> extends AbstractI
@Override
public void run() throws Exception {
-
+
SuperstepKickoffLatch nextSuperstepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
while (this.running && !terminationRequested()) {
@@ -98,19 +99,19 @@ public class IterationIntermediateTask<S extends Function, OT> extends AbstractI
long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
worksetAggregator.aggregate(numCollected);
}
-
+
if (log.isInfoEnabled()) {
log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
}
-
+
// let the successors know that the end of this superstep data is reached
sendEndOfSuperstep();
-
+
if (isWorksetUpdate) {
// notify iteration head if responsible for workset update
worksetBackChannel.notifyOfEndOfSuperstep();
}
-
+
boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
if (terminated) {
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
index 11a8cfa..6a38fcc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
@@ -16,31 +16,31 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.task;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.operators.BatchTask;
-import org.apache.flink.types.IntValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.AggregatorWithName;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
import org.apache.flink.runtime.iterative.event.TerminationEvent;
import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.types.IntValue;
import org.apache.flink.types.Value;
import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
/**
* The task responsible for synchronizing all iteration heads, implemented as an output task. This task
* will never see any data.
@@ -52,13 +52,13 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
private static final Logger log = LoggerFactory.getLogger(IterationSynchronizationSinkTask.class);
private MutableRecordReader<IntValue> headEventReader;
-
+
private SyncEventHandler eventHandler;
private ConvergenceCriterion<Value> convergenceCriterion;
private ConvergenceCriterion<Value> implicitConvergenceCriterion;
-
+
private Map<String, Aggregator<?>> aggregators;
private String convergenceAggregatorName;
@@ -66,13 +66,13 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
private String implicitConvergenceAggregatorName;
private int currentIteration = 1;
-
+
private int maxNumberOfIterations;
private final AtomicBoolean terminated = new AtomicBoolean(false);
// --------------------------------------------------------------------------------------------
-
+
@Override
public void invoke() throws Exception {
this.headEventReader = new MutableRecordReader<>(
@@ -80,13 +80,13 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
getEnvironment().getTaskManagerInfo().getTmpDirectories());
TaskConfig taskConfig = new TaskConfig(getTaskConfiguration());
-
+
// store all aggregators
this.aggregators = new HashMap<>();
for (AggregatorWithName<?> aggWithName : taskConfig.getIterationAggregators(getUserCodeClassLoader())) {
aggregators.put(aggWithName.getName(), aggWithName.getAggregator());
}
-
+
// store the aggregator convergence criterion
if (taskConfig.usesConvergenceCriterion()) {
convergenceCriterion = taskConfig.getConvergenceCriterion(getUserCodeClassLoader());
@@ -100,9 +100,9 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
implicitConvergenceAggregatorName = taskConfig.getImplicitConvergenceCriterionAggregatorName();
Preconditions.checkNotNull(implicitConvergenceAggregatorName);
}
-
+
maxNumberOfIterations = taskConfig.getNumberOfIterations();
-
+
// set up the event handler
int numEventsTillEndOfSuperstep = taskConfig.getNumberOfEventsUntilInterruptInIterativeGate(0);
eventHandler = new SyncEventHandler(numEventsTillEndOfSuperstep, aggregators,
@@ -110,7 +110,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
headEventReader.registerTaskEventListener(eventHandler, WorkerDoneEvent.class);
IntValue dummy = new IntValue();
-
+
while (!terminationRequested()) {
if (log.isInfoEnabled()) {
@@ -140,7 +140,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
AllWorkersDoneEvent allWorkersDoneEvent = new AllWorkersDoneEvent(aggregators);
sendToAllWorkers(allWorkersDoneEvent);
-
+
// reset all aggregators
for (Aggregator<?> agg : aggregators.values()) {
agg.reset();
@@ -165,7 +165,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
if (aggregator == null) {
throw new RuntimeException("Error: Aggregator for convergence criterion was null.");
}
-
+
Value aggregate = aggregator.getAggregate();
if (convergenceCriterion.isConverged(currentIteration, aggregate)) {
@@ -194,14 +194,14 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
return true;
}
}
-
+
return false;
}
private void readHeadEventChannel(IntValue rec) throws IOException {
// reset the handler
eventHandler.resetEndOfSuperstep();
-
+
// read (and thereby process all events in the handler's event handling functions)
try {
if (this.headEventReader.next(rec)) {
@@ -222,9 +222,9 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
private String formatLogString(String message) {
return BatchTask.constructLogString(message, getEnvironment().getTaskInfo().getTaskName(), this);
}
-
+
// --------------------------------------------------------------------------------------------
-
+
@Override
public boolean terminationRequested() {
return terminated.get();
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
index 9e0b560..3ec3a8e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.iterative.task;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
@@ -28,15 +26,18 @@ import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker
import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* An iteration tail, which runs a driver inside.
- * <p>
- * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadTask} via
+ *
+ * <p>If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadTask} via
* a BackChannel for the workset -OR- a HashTable for the solution set. Therefore this
* task must be scheduled on the same instance as the head. It's also possible for the tail to update *both* the workset
* and the solution set.
- * <p>
- * If there is a separate solution set tail, the iteration head has to make sure to wait for it to finish.
+ *
+ * <p>If there is a separate solution set tail, the iteration head has to make sure to wait for it to finish.
*/
public class IterationTailTask<S extends Function, OT> extends AbstractIterativeTask<S, OT> {
@@ -45,7 +46,6 @@ public class IterationTailTask<S extends Function, OT> extends AbstractIterative
private SolutionSetUpdateBarrier solutionSetUpdateBarrier;
private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;
-
@Override
protected void initialize() throws Exception {
@@ -80,6 +80,7 @@ public class IterationTailTask<S extends Function, OT> extends AbstractIterative
outputCollector = new Collector<OT>() {
@Override
public void collect(OT record) {}
+
@Override
public void close() {}
};
@@ -95,9 +96,9 @@ public class IterationTailTask<S extends Function, OT> extends AbstractIterative
@Override
public void run() throws Exception {
-
+
SuperstepKickoffLatch nextSuperStepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
-
+
while (this.running && !terminationRequested()) {
if (log.isInfoEnabled()) {
@@ -119,7 +120,7 @@ public class IterationTailTask<S extends Function, OT> extends AbstractIterative
if (log.isInfoEnabled()) {
log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
}
-
+
if (isWorksetUpdate) {
// notify iteration head if responsible for workset update
worksetBackChannel.notifyOfEndOfSuperstep();
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/RuntimeAggregatorRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/RuntimeAggregatorRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/RuntimeAggregatorRegistry.java
index 7beb4c7..c4a30c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/RuntimeAggregatorRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/RuntimeAggregatorRegistry.java
@@ -18,56 +18,56 @@
package org.apache.flink.runtime.iterative.task;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.AggregatorWithName;
import org.apache.flink.types.Value;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
/**
*
*/
public class RuntimeAggregatorRegistry {
-
+
private final Map<String, Aggregator<?>> aggregators;
-
+
private final Map<String, Value> previousGlobalAggregate;
-
+
public RuntimeAggregatorRegistry(Collection<AggregatorWithName<?>> aggs) {
this.aggregators = new HashMap<String, Aggregator<?>>();
this.previousGlobalAggregate = new HashMap<String, Value>();
-
+
for (AggregatorWithName<?> agg : aggs) {
this.aggregators.put(agg.getName(), agg.getAggregator());
}
}
-
+
public Value getPreviousGlobalAggregate(String name) {
return this.previousGlobalAggregate.get(name);
}
-
+
@SuppressWarnings("unchecked")
public <T extends Aggregator<?>> T getAggregator(String name) {
return (T) this.aggregators.get(name);
}
-
+
public Map<String, Aggregator<?>> getAllAggregators() {
return this.aggregators;
}
-
+
public void updateGlobalAggregatesAndReset(String[] names, Value[] aggregates) {
if (names == null || aggregates == null || names.length != aggregates.length) {
throw new IllegalArgumentException();
}
-
+
// add global aggregates
- for (int i = 0 ; i < names.length; i++) {
+ for (int i = 0; i < names.length; i++) {
this.previousGlobalAggregate.put(names[i], aggregates[i]);
}
-
+
// reset all aggregators
for (Aggregator<?> agg : this.aggregators.values()) {
agg.reset();
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
index 71c15b1..45843ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
@@ -18,27 +18,30 @@
package org.apache.flink.runtime.iterative.task;
-import java.util.Map;
-
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
+import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.types.Value;
import org.apache.flink.util.Preconditions;
+import java.util.Map;
+
+/**
+ * Listener for {@link WorkerDoneEvent} which also aggregates all aggregators
+ * from iteration tasks and signals the end of the superstep.
+ */
public class SyncEventHandler implements EventListener<TaskEvent> {
-
+
private final ClassLoader userCodeClassLoader;
-
+
private final Map<String, Aggregator<?>> aggregators;
private final int numberOfEventsUntilEndOfSuperstep;
private int workerDoneEventCounter;
-
- private boolean endOfSuperstep;
+ private boolean endOfSuperstep;
public SyncEventHandler(int numberOfEventsUntilEndOfSuperstep, Map<String, Aggregator<?>> aggregators, ClassLoader userCodeClassLoader) {
Preconditions.checkArgument(numberOfEventsUntilEndOfSuperstep > 0);
@@ -60,7 +63,7 @@ public class SyncEventHandler implements EventListener<TaskEvent> {
if (this.endOfSuperstep) {
throw new RuntimeException("Encountered WorderDoneEvent when still in End-of-Superstep status.");
}
-
+
workerDoneEventCounter++;
String[] aggNames = workerDoneEvent.getAggregatorNames();
@@ -69,7 +72,7 @@ public class SyncEventHandler implements EventListener<TaskEvent> {
if (aggNames.length != aggregates.length) {
throw new RuntimeException("Inconsistent WorkerDoneEvent received!");
}
-
+
for (int i = 0; i < aggNames.length; i++) {
@SuppressWarnings("unchecked")
Aggregator<Value> aggregator = (Aggregator<Value>) this.aggregators.get(aggNames[i]);
@@ -81,11 +84,11 @@ public class SyncEventHandler implements EventListener<TaskEvent> {
Thread.currentThread().interrupt();
}
}
-
+
public boolean isEndOfSuperstep() {
return this.endOfSuperstep;
}
-
+
public void resetEndOfSuperstep() {
this.endOfSuperstep = false;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java
index d6f7544..9d952ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java
@@ -16,11 +16,10 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.task;
/**
- * Models the functionality that the termination of an iterative task can be requested from outside
+ * Models the functionality that the termination of an iterative task can be requested from outside.
*/
public interface Terminable {
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest.java
index d22a23b..c9015e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest.java
@@ -16,25 +16,26 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.concurrent;
-import static org.junit.Assert.assertEquals;
-
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer;
+
+import com.google.common.collect.Lists;
import org.junit.Test;
import org.mockito.Mockito;
-import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link BlockingBackChannel}.
+ */
public class BlockingBackChannelTest {
private static final int NUM_ITERATIONS = 3;
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java
index 3f2c243..e12cb32 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java
@@ -16,14 +16,13 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.concurrent;
+import org.apache.flink.util.Preconditions;
+
import com.google.common.collect.Lists;
import org.junit.Test;
-import org.apache.flink.util.Preconditions;
-
import java.util.Collections;
import java.util.List;
import java.util.Random;
@@ -35,6 +34,9 @@ import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link Broker}.
+ */
public class BrokerTest {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/StringPair.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/StringPair.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/StringPair.java
index 798d9f5..82ddac7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/StringPair.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/StringPair.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.concurrent;
class StringPair {
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
index 2f26670..1187adf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
@@ -16,19 +16,22 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.concurrent;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Random;
-
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
import org.apache.flink.runtime.iterative.event.TerminationEvent;
+
import org.junit.Test;
+import java.util.Random;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link SuperstepBarrier}.
+ */
public class SuperstepBarrierTest {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
index 8c8deb2..90555d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
@@ -21,30 +21,33 @@ package org.apache.flink.runtime.iterative.concurrent;
import org.junit.Assert;
import org.junit.Test;
+/**
+ * Tests for {@link SuperstepKickoffLatch}.
+ */
public class SuperstepKickoffLatchTest {
@Test
public void testWaitFromOne() {
try {
SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
-
+
Waiter w = new Waiter(latch, 2);
Thread waiter = new Thread(w);
waiter.setDaemon(true);
waiter.start();
-
+
WatchDog wd = new WatchDog(waiter, 2000);
wd.start();
-
+
Thread.sleep(100);
-
+
latch.triggerNextSuperstep();
-
+
wd.join();
if (wd.getError() != null) {
throw wd.getError();
}
-
+
if (w.getError() != null) {
throw w.getError();
}
@@ -54,28 +57,28 @@ public class SuperstepKickoffLatchTest {
Assert.fail("Error: " + t.getMessage());
}
}
-
+
@Test
public void testWaitAlreadyFulfilled() {
try {
SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
latch.triggerNextSuperstep();
-
+
Waiter w = new Waiter(latch, 2);
Thread waiter = new Thread(w);
waiter.setDaemon(true);
waiter.start();
-
+
WatchDog wd = new WatchDog(waiter, 2000);
wd.start();
-
+
Thread.sleep(100);
-
+
wd.join();
if (wd.getError() != null) {
throw wd.getError();
}
-
+
if (w.getError() != null) {
throw w.getError();
}
@@ -85,14 +88,14 @@ public class SuperstepKickoffLatchTest {
Assert.fail("Error: " + t.getMessage());
}
}
-
+
@Test
public void testWaitIncorrect() {
try {
SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
latch.triggerNextSuperstep();
latch.triggerNextSuperstep();
-
+
try {
latch.awaitStartOfSuperstepOrTermination(2);
Assert.fail("should throw exception");
@@ -106,29 +109,29 @@ public class SuperstepKickoffLatchTest {
Assert.fail("Error: " + e.getMessage());
}
}
-
+
@Test
public void testWaitIncorrectAsync() {
try {
SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
latch.triggerNextSuperstep();
latch.triggerNextSuperstep();
-
+
Waiter w = new Waiter(latch, 2);
Thread waiter = new Thread(w);
waiter.setDaemon(true);
waiter.start();
-
+
WatchDog wd = new WatchDog(waiter, 2000);
wd.start();
-
+
Thread.sleep(100);
-
+
wd.join();
if (wd.getError() != null) {
throw wd.getError();
}
-
+
if (w.getError() != null) {
if (!(w.getError() instanceof IllegalStateException)) {
throw new Exception("wrong exception type " + w.getError());
@@ -142,29 +145,29 @@ public class SuperstepKickoffLatchTest {
Assert.fail("Error: " + t.getMessage());
}
}
-
+
@Test
public void testWaitForTermination() {
try {
SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
latch.triggerNextSuperstep();
latch.triggerNextSuperstep();
-
+
Waiter w = new Waiter(latch, 4);
Thread waiter = new Thread(w);
waiter.setDaemon(true);
waiter.start();
-
+
WatchDog wd = new WatchDog(waiter, 2000);
wd.start();
-
+
latch.signalTermination();
-
+
wd.join();
if (wd.getError() != null) {
throw wd.getError();
}
-
+
if (w.getError() != null) {
throw w.getError();
}
@@ -174,16 +177,15 @@ public class SuperstepKickoffLatchTest {
Assert.fail("Error: " + t.getMessage());
}
}
-
+
private static class Waiter implements Runnable {
private final SuperstepKickoffLatch latch;
-
+
private final int waitFor;
-
+
private volatile Throwable error;
-
-
+
public Waiter(SuperstepKickoffLatch latch, int waitFor) {
this.latch = latch;
this.waitFor = waitFor;
@@ -198,37 +200,37 @@ public class SuperstepKickoffLatchTest {
this.error = t;
}
}
-
+
public Throwable getError() {
return error;
}
}
-
+
private static class WatchDog extends Thread {
-
+
private final Thread toWatch;
-
+
private final long timeOut;
-
+
private volatile Throwable failed;
-
+
public WatchDog(Thread toWatch, long timeout) {
setDaemon(true);
setName("Watchdog");
this.toWatch = toWatch;
this.timeOut = timeout;
}
-
+
@SuppressWarnings("deprecation")
@Override
public void run() {
try {
toWatch.join(timeOut);
-
+
if (toWatch.isAlive()) {
this.failed = new Exception("timed out");
toWatch.interrupt();
-
+
toWatch.join(2000);
if (toWatch.isAlive()) {
toWatch.stop();
@@ -239,7 +241,7 @@ public class SuperstepKickoffLatchTest {
failed = t;
}
}
-
+
public Throwable getError() {
return failed;
}
[3/7] flink git commit: [FLINK-7070] Use properly built custom jar in
ScalaShellITCase
Posted by ch...@apache.org.
[FLINK-7070] Use properly built custom jar in ScalaShellITCase
Before, the external jar loading tests where using the flink-ml jar
without the scala shell package actually declaring this as a dependency.
Now we built our own jar and have no unlisted dependencies anymore.
This closes #4249.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/343a8042
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/343a8042
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/343a8042
Branch: refs/heads/master
Commit: 343a804209dc5eb030b18ccb4c03afc459b150c5
Parents: 41806ba
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Jul 3 17:09:25 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 4 11:37:12 2017 +0200
----------------------------------------------------------------------
flink-scala-shell/pom.xml | 55 ++++++++++++++++
.../test-scalashell-customjar-assembly.xml | 35 ++++++++++
.../flink/api/scala/ScalaShellITCase.scala | 67 ++++----------------
.../flink/api/scala/jar/TestingData.scala | 26 ++++++++
.../apache/flink/api/scala/jar/package.scala | 25 ++++++++
5 files changed, 152 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/343a8042/flink-scala-shell/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala-shell/pom.xml b/flink-scala-shell/pom.xml
index 6f490e4..e312699 100644
--- a/flink-scala-shell/pom.xml
+++ b/flink-scala-shell/pom.xml
@@ -242,6 +242,61 @@ under the License.
</executions>
</plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4</version><!--$NO-MVN-MAN-VER$-->
+ <executions>
+ <execution>
+ <id>create-library-loading-jar</id>
+ <phase>process-test-classes</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <!--<archive>-->
+ <!--<manifest>-->
+ <!--<mainClass>org.apache.flink.test.classloading.jar.KMeansForTest</mainClass>-->
+ <!--</manifest>-->
+ <!--</archive>-->
+ <finalName>customjar</finalName>
+ <attach>false</attach>
+ <descriptors>
+ <descriptor>src/test/assembly/test-scalashell-customjar-assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+
+ <!--Remove the the classes in the jar package from the test-classes directory since they
+ musn't be in the classpath when running ScalaShellITCase to actually test loading
+ of external jars.-->
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <version>2.5</version><!--$NO-MVN-MAN-VER$-->
+ <executions>
+ <execution>
+ <id>remove-classloading-test-dependencies</id>
+ <phase>process-test-classes</phase>
+ <goals>
+ <goal>clean</goal>
+ </goals>
+ <configuration>
+ <excludeDefaultDirectories>true</excludeDefaultDirectories>
+ <filesets>
+ <fileset>
+ <directory>${project.build.testOutputDirectory}</directory>
+ <includes>
+ <include>**/jar/*.class</include>
+ </includes>
+ </fileset>
+ </filesets>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/flink/blob/343a8042/flink-scala-shell/src/test/assembly/test-scalashell-customjar-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/assembly/test-scalashell-customjar-assembly.xml b/flink-scala-shell/src/test/assembly/test-scalashell-customjar-assembly.xml
new file mode 100644
index 0000000..ef20cfd
--- /dev/null
+++ b/flink-scala-shell/src/test/assembly/test-scalashell-customjar-assembly.xml
@@ -0,0 +1,35 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<assembly>
+ <id>test-jar</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${project.build.testOutputDirectory}</directory>
+ <outputDirectory>/</outputDirectory>
+ <!--modify/add include to match your package(s) -->
+ <includes>
+ <include>org/apache/flink/api/scala/jar/*</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+</assembly>
http://git-wip-us.apache.org/repos/asf/flink/blob/343a8042/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 0e89da3..ce08304 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -169,36 +169,19 @@ class ScalaShellITCase extends TestLogger {
def testSubmissionOfExternalLibraryBatch: Unit = {
val input =
"""
- import org.apache.flink.ml.math._
- val denseVectors = benv.fromElements[Vector](DenseVector(1.0, 2.0, 3.0))
- denseVectors.print()
+ import org.apache.flink.api.scala.jar.TestingData
+ val source = benv.fromCollection(TestingData.elements)
+ source.print()
""".stripMargin
- // find jar file that contains the ml code
- var externalJar = ""
- val folder = findLibraryFolder(
- "../flink-libraries/flink-ml/target/",
- "../../flink-libraries/flink-ml/target/")
-
- val listOfFiles = folder.listFiles()
-
- for (i <- listOfFiles.indices) {
- val filename: String = listOfFiles(i).getName
- if (!filename.contains("test") && !filename.contains("original") && filename.contains(
- ".jar")) {
- externalJar = listOfFiles(i).getAbsolutePath
- }
- }
-
- assert(externalJar != "")
-
- val output: String = processInShell(input, Option(externalJar))
+ val output: String = processInShell(input, Option("customjar-test-jar.jar"))
Assert.assertFalse(output.contains("failed"))
Assert.assertFalse(output.contains("error"))
Assert.assertFalse(output.contains("Exception"))
- Assert.assertTrue(output.contains("\nDenseVector(1.0, 2.0, 3.0)"))
+
+ Assert.assertTrue(output.contains("\nHELLO 42"))
}
/** Submit external library */
@@ -206,37 +189,19 @@ class ScalaShellITCase extends TestLogger {
def testSubmissionOfExternalLibraryStream: Unit = {
val input =
"""
- import org.apache.flink.ml.math._
- val denseVectors = senv.fromElements[Vector](DenseVector(1.0, 2.0, 3.0))
- denseVectors.print()
+ import org.apache.flink.api.scala.jar.TestingData
+ val source = senv.fromCollection(TestingData.elements)
+ source.print()
senv.execute
""".stripMargin
- // find jar file that contains the ml code
- var externalJar = ""
- val folder = findLibraryFolder(
- "../flink-libraries/flink-ml/target/",
- "../../flink-libraries/flink-ml/target/")
-
- val listOfFiles = folder.listFiles()
-
- for (i <- listOfFiles.indices) {
- val filename: String = listOfFiles(i).getName
- if (!filename.contains("test") && !filename.contains("original") && filename.contains(
- ".jar")) {
- externalJar = listOfFiles(i).getAbsolutePath
- }
- }
-
- assert(externalJar != "")
-
- val output: String = processInShell(input, Option(externalJar))
+ val output: String = processInShell(input, Option("customjar-test-jar.jar"))
Assert.assertFalse(output.contains("failed"))
Assert.assertFalse(output.contains("error"))
Assert.assertFalse(output.contains("Exception"))
- Assert.assertTrue(output.contains("\nDenseVector(1.0, 2.0, 3.0)"))
+ Assert.assertTrue(output.contains("\nHELLO 42"))
}
@@ -413,14 +378,4 @@ object ScalaShellITCase {
case _ => throw new IllegalStateException("The cluster has not been started.")
}
}
-
- def findLibraryFolder(paths: String*): File = {
- for (path <- paths) {
- val folder = new File(path)
- if (folder.exists()) {
- return folder
- }
- }
- throw new RuntimeException("Library folder not found in any of the supplied paths!")
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/343a8042/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/jar/TestingData.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/jar/TestingData.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/jar/TestingData.scala
new file mode 100644
index 0000000..428a8e6
--- /dev/null
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/jar/TestingData.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.jar
+
+/**
+ * Testing data for [[org.apache.flink.api.scala.ScalaShellITCase]]. This will be put into a
+ * separate jar file to test loading of external libraries.
+ */
+object TestingData {
+ val elements = Seq("HELLO 42", "CIAO", "BLA", "BLU")
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/343a8042/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/jar/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/jar/package.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/jar/package.scala
new file mode 100644
index 0000000..51303f2
--- /dev/null
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/jar/package.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+/**
+ * Custom objects for use in testing loading of external jars in [[ScalaShellITCase]].
+ */
+package object jar {
+
+}
[7/7] flink git commit: [FLINK-6826] [runtime] Activate checkstyle
for runtime/net
Posted by ch...@apache.org.
[FLINK-6826] [runtime] Activate checkstyle for runtime/net
This closes #4065.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/85853eda
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/85853eda
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/85853eda
Branch: refs/heads/master
Commit: 85853edab3e76ec83790f1b9c8f51bd2f6afe087
Parents: c59537e
Author: zentol <ch...@apache.org>
Authored: Fri Jun 2 23:25:16 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 4 12:33:33 2017 +0200
----------------------------------------------------------------------
flink-runtime/pom.xml | 1 -
.../flink/runtime/net/ConnectionUtils.java | 57 ++++++++++----------
.../org/apache/flink/runtime/net/SSLUtils.java | 15 +++---
.../flink/runtime/net/ConnectionUtilsTest.java | 17 +++---
.../apache/flink/runtime/net/SSLUtilsTest.java | 26 ++++-----
5 files changed, 59 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/85853eda/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 1fff305..a21f753 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -447,7 +447,6 @@ under the License.
**/runtime/memory/**,
**/runtime/messages/**,
**/runtime/minicluster/**,
- **/runtime/net/**,
**/runtime/operators/**,
**/runtime/plugable/**,
**/runtime/query/**,
http://git-wip-us.apache.org/repos/asf/flink/blob/85853eda/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
index 75a7ebe..673d27c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
@@ -18,6 +18,13 @@
package org.apache.flink.runtime.net;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -32,25 +39,17 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import scala.concurrent.duration.FiniteDuration;
-
/**
* Utilities to determine the network interface and address that should be used to bind the
* TaskManager communication to.
- *
+ *
* <p>Implementation note: This class uses {@code System.nanoTime()} to measure elapsed time, because
* that is not susceptible to clock changes.
*/
public class ConnectionUtils {
-
+
private static final Logger LOG = LoggerFactory.getLogger(ConnectionUtils.class);
private static final long MIN_SLEEP_TIME = 50;
@@ -61,15 +60,15 @@ public class ConnectionUtils {
* There is only a state transition if the current state failed to determine the address.
*/
private enum AddressDetectionState {
- /** Connect from interface returned by InetAddress.getLocalHost() **/
+ /** Connect from interface returned by InetAddress.getLocalHost(). **/
LOCAL_HOST(200),
/** Detect own IP address based on the target IP address. Look for common prefix */
ADDRESS(50),
- /** Try to connect on all Interfaces and all their addresses with a low timeout */
+ /** Try to connect on all Interfaces and all their addresses with a low timeout. */
FAST_CONNECT(50),
- /** Try to connect on all Interfaces and all their addresses with a long timeout */
+ /** Try to connect on all Interfaces and all their addresses with a long timeout. */
SLOW_CONNECT(1000),
- /** Choose any non-loopback address */
+ /** Choose any non-loopback address. */
HEURISTIC(0);
private final int timeout;
@@ -90,11 +89,11 @@ public class ConnectionUtils {
* given target, so it only succeeds if the target socket address actually accepts
* connections. The method tries various strategies multiple times and uses an exponential
* backoff timer between tries.
- * <p>
- * If no connection attempt was successful after the given maximum time, the method
+ *
+ * <p>If no connection attempt was successful after the given maximum time, the method
* will choose some address based on heuristics (excluding link-local and loopback addresses.)
- * <p>
- * This method will initially not log on info level (to not flood the log while the
+ *
+ * <p>This method will initially not log on info level (to not flood the log while the
* backoff time is still very low). It will start logging after a certain time
* has passes.
*
@@ -104,8 +103,7 @@ public class ConnectionUtils {
* @param startLoggingAfter The time after which the method will log on INFO level.
*/
public static InetAddress findConnectingAddress(InetSocketAddress targetAddress,
- long maxWaitMillis, long startLoggingAfter) throws IOException
- {
+ long maxWaitMillis, long startLoggingAfter) throws IOException {
if (targetAddress == null) {
throw new NullPointerException("targetAddress must not be null");
}
@@ -188,11 +186,11 @@ public class ConnectionUtils {
*/
private static InetAddress tryLocalHostBeforeReturning(
InetAddress preliminaryResult, SocketAddress targetAddress, boolean logging) throws IOException {
-
+
InetAddress localhostName = InetAddress.getLocalHost();
-
+
if (preliminaryResult.equals(localhostName)) {
- // preliminary result is equal to the local host name
+ // preliminary result is equal to the local host name
return preliminaryResult;
}
else if (tryToConnect(localhostName, targetAddress, AddressDetectionState.SLOW_CONNECT.getTimeout(), logging)) {
@@ -209,7 +207,7 @@ public class ConnectionUtils {
/**
* Try to find a local address which allows as to connect to the targetAddress using the given
- * strategy
+ * strategy.
*
* @param strategy Depending on the strategy, the method will enumerate all interfaces, trying to connect
* to the target address
@@ -220,8 +218,7 @@ public class ConnectionUtils {
*/
private static InetAddress findAddressUsingStrategy(AddressDetectionState strategy,
InetSocketAddress targetAddress,
- boolean logging) throws IOException
- {
+ boolean logging) throws IOException {
// try LOCAL_HOST strategy independent of the network interfaces
if (strategy == AddressDetectionState.LOCAL_HOST) {
InetAddress localhostName;
@@ -316,8 +313,7 @@ public class ConnectionUtils {
* @throws IOException Thrown if the socket cleanup fails.
*/
private static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket,
- int timeout, boolean logFailed) throws IOException
- {
+ int timeout, boolean logFailed) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to connect to (" + toSocket + ") from local address " + fromAddress
+ " with timeout " + timeout);
@@ -341,6 +337,9 @@ public class ConnectionUtils {
}
}
+ /**
+ * A {@link LeaderRetrievalListener} that allows retrieving an {@link InetAddress} for the current leader.
+ */
public static class LeaderConnectingAddressListener implements LeaderRetrievalListener {
private static final FiniteDuration defaultLoggingDelay = new FiniteDuration(400, TimeUnit.MILLISECONDS);
@@ -351,7 +350,7 @@ public class ConnectionUtils {
NEWLY_RETRIEVED
}
- final private Object retrievalLock = new Object();
+ private final Object retrievalLock = new Object();
private String akkaURL;
private LeaderRetrievalState retrievalState = LeaderRetrievalState.NOT_RETRIEVED;
http://git-wip-us.apache.org/repos/asf/flink/blob/85853eda/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
index 015b3d6..663d221 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
@@ -31,6 +31,7 @@ import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLServerSocket;
import javax.net.ssl.TrustManagerFactory;
+
import java.io.File;
import java.io.FileInputStream;
import java.net.ServerSocket;
@@ -38,13 +39,13 @@ import java.security.KeyStore;
import java.util.Arrays;
/**
- * Common utilities to manage SSL transport settings
+ * Common utilities to manage SSL transport settings.
*/
public class SSLUtils {
private static final Logger LOG = LoggerFactory.getLogger(SSLUtils.class);
/**
- * Retrieves the global ssl flag from configuration
+ * Retrieves the global ssl flag from configuration.
*
* @param sslConfig
* The application configuration
@@ -58,7 +59,7 @@ public class SSLUtils {
}
/**
- * Sets SSl version and cipher suites for SSLServerSocket
+ * Sets SSl version and cipher suites for SSLServerSocket.
* @param socket
* Socket to be handled
* @param config
@@ -81,7 +82,7 @@ public class SSLUtils {
}
/**
- * Sets SSL version and cipher suites for SSLEngine
+ * Sets SSL version and cipher suites for SSLEngine.
* @param engine
* SSLEngine to be handled
* @param config
@@ -93,7 +94,7 @@ public class SSLUtils {
}
/**
- * Sets SSL options to verify peer's hostname in the certificate
+ * Sets SSL options to verify peer's hostname in the certificate.
*
* @param sslConfig
* The application configuration
@@ -112,7 +113,7 @@ public class SSLUtils {
}
/**
- * Creates the SSL Context for the client if SSL is configured
+ * Creates the SSL Context for the client if SSL is configured.
*
* @param sslConfig
* The application configuration
@@ -160,7 +161,7 @@ public class SSLUtils {
}
/**
- * Creates the SSL Context for the server if SSL is configured
+ * Creates the SSL Context for the server if SSL is configured.
*
* @param sslConfig
* The application configuration
http://git-wip-us.apache.org/repos/asf/flink/blob/85853eda/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
index b5c2819..d7c4baa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
@@ -15,14 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.runtime.net;
-import java.io.IOException;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.UnknownHostException;
+package org.apache.flink.runtime.net;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -31,6 +25,13 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.UnknownHostException;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -48,7 +49,7 @@ public class ConnectionUtilsTest {
// the "blocker" server socket simply does not accept connections
// this address is consequently "unreachable"
InetSocketAddress unreachable = new InetSocketAddress("localhost", blocker.getLocalPort());
-
+
final long start = System.nanoTime();
InetAddress add = ConnectionUtils.findConnectingAddress(unreachable, 2000, 400);
http://git-wip-us.apache.org/repos/asf/flink/blob/85853eda/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
index a3c2b7b..87d0ccc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
@@ -15,26 +15,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.net;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
+
import org.junit.Assert;
import org.junit.Test;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLServerSocket;
+
import java.net.ServerSocket;
-import java.util.Random;
-/*
- * Tests for the SSL utilities
+/**
+ * Tests for the {@link SSLUtils}.
*/
public class SSLUtilsTest {
/**
- * Tests if SSL Client Context is created given a valid SSL configuration
+ * Tests if SSL Client Context is created given a valid SSL configuration.
*/
@Test
public void testCreateSSLClientContext() throws Exception {
@@ -49,7 +51,7 @@ public class SSLUtilsTest {
}
/**
- * Tests if SSL Client Context is not created if SSL is not configured
+ * Tests if SSL Client Context is not created if SSL is not configured.
*/
@Test
public void testCreateSSLClientContextWithSSLDisabled() throws Exception {
@@ -62,7 +64,7 @@ public class SSLUtilsTest {
}
/**
- * Tests if SSL Client Context creation fails with bad SSL configuration
+ * Tests if SSL Client Context creation fails with bad SSL configuration.
*/
@Test
public void testCreateSSLClientContextMisconfiguration() {
@@ -81,7 +83,7 @@ public class SSLUtilsTest {
}
/**
- * Tests if SSL Server Context is created given a valid SSL configuration
+ * Tests if SSL Server Context is created given a valid SSL configuration.
*/
@Test
public void testCreateSSLServerContext() throws Exception {
@@ -97,7 +99,7 @@ public class SSLUtilsTest {
}
/**
- * Tests if SSL Server Context is not created if SSL is disabled
+ * Tests if SSL Server Context is not created if SSL is disabled.
*/
@Test
public void testCreateSSLServerContextWithSSLDisabled() throws Exception {
@@ -110,7 +112,7 @@ public class SSLUtilsTest {
}
/**
- * Tests if SSL Server Context creation fails with bad SSL configuration
+ * Tests if SSL Server Context creation fails with bad SSL configuration.
*/
@Test
public void testCreateSSLServerContextMisconfiguration() {
@@ -130,7 +132,7 @@ public class SSLUtilsTest {
}
/**
- * Tests if SSL Server Context creation fails with bad SSL configuration
+ * Tests if SSL Server Context creation fails with bad SSL configuration.
*/
@Test
public void testCreateSSLServerContextWithMultiProtocols() {
@@ -151,7 +153,7 @@ public class SSLUtilsTest {
}
/**
- * Tests if SSLUtils set the right ssl version and cipher suites for SSLServerSocket
+ * Tests if SSLUtils set the right ssl version and cipher suites for SSLServerSocket.
*/
@Test
public void testSetSSLVersionAndCipherSuitesForSSLServerSocket() throws Exception {
@@ -192,7 +194,7 @@ public class SSLUtilsTest {
}
/**
- * Tests if SSLUtils set the right ssl version and cipher suites for SSLEngine
+ * Tests if SSLUtils set the right ssl version and cipher suites for SSLEngine.
*/
@Test
public void testSetSSLVersionAndCipherSuitesForSSLEngine() throws Exception {
[5/7] flink git commit: [FLINK-6821] [runtime] Activate checkstyle
for runtime/fs
Posted by ch...@apache.org.
[FLINK-6821] [runtime] Activate checkstyle for runtime/fs
This closes #4063.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9f659e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9f659e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9f659e0
Branch: refs/heads/master
Commit: c9f659e046a7b42e79d72df74bead5809ab2fe46
Parents: 31ad802
Author: zentol <ch...@apache.org>
Authored: Fri Jun 2 21:16:52 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 4 12:33:31 2017 +0200
----------------------------------------------------------------------
flink-runtime/pom.xml | 1 -
.../runtime/fs/hdfs/HadoopBlockLocation.java | 16 ++---
.../runtime/fs/hdfs/HadoopDataInputStream.java | 19 +++--
.../runtime/fs/hdfs/HadoopDataOutputStream.java | 8 ++-
.../flink/runtime/fs/hdfs/HadoopFileStatus.java | 2 +-
.../flink/runtime/fs/hdfs/HadoopFileSystem.java | 75 +++++++++-----------
.../flink/runtime/fs/maprfs/MapRFileSystem.java | 29 ++++----
.../fs/hdfs/HadoopDataInputStreamTest.java | 3 +
8 files changed, 73 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c9f659e0/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 80f95a5..1fff305 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -436,7 +436,6 @@ under the License.
**/runtime/deployment/**,
**/runtime/execution/**,
**/runtime/executiongraph/**,
- **/runtime/fs/**,
**/runtime/heartbeat/**,
**/runtime/highavailability/**,
**/runtime/instance/**,
http://git-wip-us.apache.org/repos/asf/flink/blob/c9f659e0/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
index a1cc72c..1484c95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
@@ -16,19 +16,17 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.fs.hdfs;
+import org.apache.flink.core.fs.BlockLocation;
+
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.apache.flink.core.fs.BlockLocation;
-
/**
* Implementation of the {@link BlockLocation} interface for the
* Hadoop Distributed File System.
- *
*/
public final class HadoopBlockLocation implements BlockLocation {
@@ -53,8 +51,8 @@ public final class HadoopBlockLocation implements BlockLocation {
private String[] hostnames;
/**
- * Creates a new block location
- *
+ * Creates a new block location.
+ *
* @param blockLocation
* the original HDFS block location
*/
@@ -63,7 +61,6 @@ public final class HadoopBlockLocation implements BlockLocation {
this.blockLocation = blockLocation;
}
-
@Override
public String[] getHosts() throws IOException {
@@ -88,7 +85,7 @@ public final class HadoopBlockLocation implements BlockLocation {
/**
* Looks for a domain suffix in a FQDN and strips it if present.
- *
+ *
* @param originalHostname
* the original hostname, possibly an FQDN
* @return the stripped hostname without the domain suffix
@@ -114,21 +111,18 @@ public final class HadoopBlockLocation implements BlockLocation {
return originalHostname.substring(0, index);
}
-
@Override
public long getLength() {
return this.blockLocation.getLength();
}
-
@Override
public long getOffset() {
return this.blockLocation.getOffset();
}
-
@Override
public int compareTo(final BlockLocation o) {
http://git-wip-us.apache.org/repos/asf/flink/blob/c9f659e0/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
index 3cc841e..da50c4c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
@@ -27,19 +27,19 @@ import java.io.IOException;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * Concrete implementation of the {@link FSDataInputStream} for the Hadoop's input streams.
+ * Concrete implementation of the {@link FSDataInputStream} for Hadoop's input streams.
* This supports all file systems supported by Hadoop, such as HDFS and S3 (S3a/S3n).
*/
public final class HadoopDataInputStream extends FSDataInputStream {
/**
* Minimum amount of bytes to skip forward before we issue a seek instead of discarding read.
- * <p>
- * The current value is just a magic number. In the long run, this value could become configurable, but for now it
+ *
+ * <p>The current value is just a magic number. In the long run, this value could become configurable, but for now it
* is a conservative, relatively small value that should bring safe improvements for small skips (e.g. in reading
* meta data), that would hurt the most with frequent seeks.
- * <p>
- * The optimal value depends on the DFS implementation and configuration plus the underlying filesystem.
+ *
+ * <p>The optimal value depends on the DFS implementation and configuration plus the underlying filesystem.
* For now, this number is chosen "big enough" to provide improvements for smaller seeks, and "small enough" to
* avoid disadvantages over real seeks. While the minimum should be the page size, a true optimum per system would
* be the amounts of bytes the can be consumed sequentially within the seektime. Unfortunately, seektime is not
@@ -47,11 +47,11 @@ public final class HadoopDataInputStream extends FSDataInputStream {
*/
public static final int MIN_SKIP_BYTES = 1024 * 1024;
- /** The internal stream */
+ /** The internal stream. */
private final org.apache.hadoop.fs.FSDataInputStream fsDataInputStream;
/**
- * Creates a new data input stream from the given Hadoop input stream
+ * Creates a new data input stream from the given Hadoop input stream.
*
* @param fsDataInputStream The Hadoop input stream
*/
@@ -59,7 +59,6 @@ public final class HadoopDataInputStream extends FSDataInputStream {
this.fsDataInputStream = checkNotNull(fsDataInputStream);
}
-
@Override
public void seek(long seekPos) throws IOException {
// We do some optimizations to avoid that some implementations of distributed FS perform
@@ -116,8 +115,8 @@ public final class HadoopDataInputStream extends FSDataInputStream {
/**
* Positions the stream to the given location. In contrast to {@link #seek(long)}, this method will
* always issue a "seek" command to the dfs and may not replace it by {@link #skip(long)} for small seeks.
- * <p>
- * Notice that the underlying DFS implementation can still decide to do skip instead of seek.
+ *
+ * <p>Notice that the underlying DFS implementation can still decide to do skip instead of seek.
*
* @param seekPos the position to seek to.
* @throws IOException
http://git-wip-us.apache.org/repos/asf/flink/blob/c9f659e0/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
index 8787181..1b8d1a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
@@ -18,10 +18,14 @@
package org.apache.flink.runtime.fs.hdfs;
-import java.io.IOException;
-
import org.apache.flink.core.fs.FSDataOutputStream;
+import java.io.IOException;
+
+/**
+ * Concrete implementation of the {@link FSDataOutputStream} for Hadoop's input streams.
+ * This supports all file systems supported by Hadoop, such as HDFS and S3 (S3a/S3n).
+ */
public class HadoopDataOutputStream extends FSDataOutputStream {
private final org.apache.hadoop.fs.FSDataOutputStream fdos;
http://git-wip-us.apache.org/repos/asf/flink/blob/c9f659e0/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
index 519791e..17bb334 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
@@ -31,7 +31,7 @@ public final class HadoopFileStatus implements FileStatus {
/**
* Creates a new file status from a HDFS file status.
- *
+ *
* @param fileStatus
* the HDFS file status
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/c9f659e0/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 1371d21..f47423f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.fs.hdfs;
import org.apache.flink.configuration.ConfigConstants;
@@ -25,6 +26,7 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.HadoopFileSystemWrapper;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.InstantiationUtil;
+
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,16 +43,16 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* Concrete implementation of the {@link FileSystem} base class for the Hadoop File System. The
* class is a wrapper class which encapsulated the original Hadoop HDFS API.
*
- * If no file system class is specified, the wrapper will automatically load the Hadoop
+ * <p>If no file system class is specified, the wrapper will automatically load the Hadoop
* distributed file system (HDFS).
*
*/
public final class HadoopFileSystem extends FileSystem implements HadoopFileSystemWrapper {
-
+
private static final Logger LOG = LoggerFactory.getLogger(HadoopFileSystem.class);
-
+
private static final String DEFAULT_HDFS_CLASS = "org.apache.hadoop.hdfs.DistributedFileSystem";
-
+
/**
* Configuration value name for the DFS implementation name. Usually not specified in hadoop configurations.
*/
@@ -64,7 +66,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
/**
* Creates a new DistributedFileSystem object to access HDFS, based on a class name
* and picking up the configuration from the class path or the Flink configuration.
- *
+ *
* @throws IOException
* throw if the required HDFS classes cannot be instantiated
*/
@@ -72,7 +74,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
// Create new Hadoop configuration object
this.conf = getHadoopConfiguration();
- if(fsClass == null) {
+ if (fsClass == null) {
fsClass = getDefaultHDFSClass();
}
@@ -126,8 +128,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
}
// fall back to an older Hadoop version
- if (fsClass == null)
- {
+ if (fsClass == null) {
// first of all, check for a user-defined hdfs class
if (LOG.isDebugEnabled()) {
LOG.debug("Falling back to loading HDFS class old Hadoop style. Looking for HDFS class configuration entry '{}'.",
@@ -136,13 +137,12 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
Class<?> classFromConfig = conf.getClass(HDFS_IMPLEMENTATION_KEY, null);
- if (classFromConfig != null)
- {
+ if (classFromConfig != null) {
if (org.apache.hadoop.fs.FileSystem.class.isAssignableFrom(classFromConfig)) {
fsClass = classFromConfig.asSubclass(org.apache.hadoop.fs.FileSystem.class);
if (LOG.isDebugEnabled()) {
- LOG.debug("Loaded HDFS class '{}' as specified in configuration.", fsClass.getName() );
+ LOG.debug("Loaded HDFS class '{}' as specified in configuration.", fsClass.getName());
}
}
else {
@@ -187,7 +187,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
}
/**
- * Returns a new Hadoop Configuration object using the path to the hadoop conf configured
+ * Returns a new Hadoop Configuration object using the path to the hadoop conf configured
* in the main configuration (flink-conf.yaml).
* This method is public because its being used in the HadoopDataSource.
*/
@@ -215,15 +215,15 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
} else {
LOG.trace("{} configuration key for hdfs-site configuration file not set", ConfigConstants.HDFS_SITE_CONFIG);
}
-
+
// 2. Approach environment variables
- String[] possibleHadoopConfPaths = new String[4];
+ String[] possibleHadoopConfPaths = new String[4];
possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");
-
+
if (System.getenv("HADOOP_HOME") != null) {
- possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME")+"/conf";
- possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME")+"/etc/hadoop"; // hadoop 2.2
+ possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME") + "/conf";
+ possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME") + "/etc/hadoop"; // hadoop 2.2
}
for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
@@ -245,10 +245,9 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
}
return retConf;
}
-
+
private org.apache.hadoop.fs.FileSystem instantiateFileSystem(Class<? extends org.apache.hadoop.fs.FileSystem> fsClass)
- throws IOException
- {
+ throws IOException {
try {
return fsClass.newInstance();
}
@@ -266,7 +265,6 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
}
}
-
@Override
public Path getWorkingDirectory() {
return new Path(this.fs.getWorkingDirectory().toUri());
@@ -288,30 +286,30 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
public org.apache.hadoop.fs.FileSystem getHadoopFileSystem() {
return this.fs;
}
-
+
@Override
public void initialize(URI path) throws IOException {
-
+
// If the authority is not part of the path, we initialize with the fs.defaultFS entry.
if (path.getAuthority() == null) {
-
+
String configEntry = this.conf.get("fs.defaultFS", null);
if (configEntry == null) {
// fs.default.name deprecated as of hadoop 2.2.0 http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
configEntry = this.conf.get("fs.default.name", null);
}
-
+
if (LOG.isDebugEnabled()) {
LOG.debug("fs.defaultFS is set to {}", configEntry);
}
-
+
if (configEntry == null) {
throw new IOException(getMissingAuthorityErrorPrefix(path) + "Either no default file system (hdfs) configuration was registered, " +
"or that configuration did not contain an entry for the default file system (usually 'fs.defaultFS').");
} else {
try {
URI initURI = URI.create(configEntry);
-
+
if (initURI.getAuthority() == null) {
throw new IOException(getMissingAuthorityErrorPrefix(path) + "Either no default file system was registered, " +
"or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) " +
@@ -330,7 +328,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
throw new IOException(getMissingAuthorityErrorPrefix(path) +
"The configuration contains an invalid file system default name (fs.default.name or fs.defaultFS): " + configEntry);
}
- }
+ }
}
else {
// Initialize file system
@@ -341,11 +339,11 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
String message = "The (HDFS NameNode) host at '" + path.getAuthority()
+ "', specified by file path '" + path.toString() + "', cannot be resolved"
+ (e.getMessage() != null ? ": " + e.getMessage() : ".");
-
+
if (path.getPort() == -1) {
message += " Hint: Have you forgotten a slash? (correct URI would be 'hdfs:///" + path.getAuthority() + path.getPath() + "' ?)";
}
-
+
throw new IOException(message, e);
}
catch (Exception e) {
@@ -355,14 +353,13 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
}
}
}
-
+
private static String getMissingAuthorityErrorPrefix(URI path) {
return "The given HDFS file URI (" + path.toString() + ") did not describe the HDFS NameNode." +
- " The attempt to use a default HDFS configuration, as specified in the '" + ConfigConstants.HDFS_DEFAULT_CONFIG + "' or '" +
+ " The attempt to use a default HDFS configuration, as specified in the '" + ConfigConstants.HDFS_DEFAULT_CONFIG + "' or '" +
ConfigConstants.HDFS_SITE_CONFIG + "' config parameter failed due to the following problem: ";
}
-
@Override
public FileStatus getFileStatus(final Path f) throws IOException {
org.apache.hadoop.fs.FileStatus status = this.fs.getFileStatus(new org.apache.hadoop.fs.Path(f.toString()));
@@ -371,8 +368,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
@Override
public BlockLocation[] getFileBlockLocations(final FileStatus file, final long start, final long len)
- throws IOException
- {
+ throws IOException {
if (!(file instanceof HadoopFileStatus)) {
throw new IOException("file is not an instance of DistributedFileStatus");
}
@@ -407,15 +403,12 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
@Override
public HadoopDataOutputStream create(final Path f, final boolean overwrite, final int bufferSize,
- final short replication, final long blockSize)
- throws IOException
- {
+ final short replication, final long blockSize) throws IOException {
final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
new org.apache.hadoop.fs.Path(f.toString()), overwrite, bufferSize, replication, blockSize);
return new HadoopDataOutputStream(fdos);
}
-
@Override
public HadoopDataOutputStream create(final Path f, final WriteMode overwrite) throws IOException {
final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream = this.fs
@@ -437,7 +430,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
for (int i = 0; i < files.length; i++) {
files[i] = new HadoopFileStatus(hadoopFiles[i]);
}
-
+
return files;
}
@@ -476,7 +469,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
// }
clazz = hadoopConf.getClass("fs." + scheme + ".impl", null, org.apache.hadoop.fs.FileSystem.class);
- if(clazz != null && LOG.isDebugEnabled()) {
+ if (clazz != null && LOG.isDebugEnabled()) {
LOG.debug("Flink supports {} with the Hadoop file system wrapper, impl {}", scheme, clazz);
}
return clazz;
http://git-wip-us.apache.org/repos/asf/flink/blob/c9f659e0/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
index 57eea6f..275e492 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
@@ -18,17 +18,6 @@
package org.apache.flink.runtime.fs.maprfs;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.flink.core.fs.BlockLocation;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
@@ -40,6 +29,18 @@ import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;
import org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream;
import org.apache.flink.runtime.fs.hdfs.HadoopFileStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* Concrete implementation of the {@link FileSystem} base class for the MapR
* file system. The class contains MapR specific code to initialize the
@@ -94,7 +95,7 @@ public final class MapRFileSystem extends FileSystem {
/**
* Creates a new MapRFileSystem object to access the MapR file system.
- *
+ *
* @throws IOException
* throw if the required MapR classes cannot be found
*/
@@ -180,8 +181,8 @@ public final class MapRFileSystem extends FileSystem {
}
/**
- * Retrieves the CLDB locations for the given MapR cluster name
- *
+ * Retrieves the CLDB locations for the given MapR cluster name.
+ *
* @param authority
* the name of the MapR cluster
* @return a list of CLDB locations
http://git-wip-us.apache.org/repos/asf/flink/blob/c9f659e0/flink-runtime/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
index 58de3db..21c18bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
@@ -35,6 +35,9 @@ import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
+/**
+ * Tests for the {@link HadoopDataInputStream}.
+ */
public class HadoopDataInputStreamTest {
private FSDataInputStream verifyInputStream;
[6/7] flink git commit: [FLINK-7039] [build] Increase
forkCountTestPackage for sudo-enabled TravisCI
Posted by ch...@apache.org.
[FLINK-7039] [build] Increase forkCountTestPackage for sudo-enabled TravisCI
The switch from the container-based to sudo-enabled environment in
TravisCI has increased available memory from 4 GB to 7.5 GB so use a
forkCount of 2 in all packages including flink-test.
This closes #4222.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c59537ee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c59537ee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c59537ee
Branch: refs/heads/master
Commit: c59537eebc7251e3078553d84271597bb720b518
Parents: c9f659e
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Jun 29 06:42:51 2017 -0400
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 4 12:33:32 2017 +0200
----------------------------------------------------------------------
tools/travis_mvn_watchdog.sh | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c59537ee/tools/travis_mvn_watchdog.sh
----------------------------------------------------------------------
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index 9733866..f3c1699 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -44,9 +44,10 @@ SLEEP_TIME=20
LOG4J_PROPERTIES=${HERE}/log4j-travis.properties
# Maven command to run. We set the forkCount manually, because otherwise Maven sees too many cores
-# on the Travis VMs.
+# on the Travis VMs. Set forkCountTestPackage to 1 for container-based environment (4 GiB memory)
+# and 2 for sudo-enabled environment (7.5 GiB memory).
MVN_LOGGING_OPTIONS="-Dlog.dir=${ARTIFACTS_DIR} -Dlog4j.configuration=file://$LOG4J_PROPERTIES -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"
-MVN="mvn -Dflink.forkCount=2 -Dflink.forkCountTestPackage=1 -Dmaven.javadoc.skip=true -B $PROFILE $MVN_LOGGING_OPTIONS clean install"
+MVN="mvn -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dmaven.javadoc.skip=true -B $PROFILE $MVN_LOGGING_OPTIONS clean install"
MVN_PID="${ARTIFACTS_DIR}/watchdog.mvn.pid"
MVN_EXIT="${ARTIFACTS_DIR}/watchdog.mvn.exit"