You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/25 18:49:42 UTC

[1/5] flink git commit: [FLINK-6715] Activate strict checkstyle for flink-mesos

Repository: flink
Updated Branches:
  refs/heads/master 0e69dd5cc -> 79b869498


http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
index bc303e9..1f33cb5 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *	 http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,20 +18,26 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.TestLogger;
+
 import com.netflix.fenzo.ConstraintEvaluator;
 import com.netflix.fenzo.functions.Func1;
 import com.netflix.fenzo.plugins.HostAttrValueConstraint;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.TestLogger;
 import org.apache.mesos.Protos;
 import org.junit.Test;
-import scala.Option;
 
 import java.util.List;
 
+import scala.Option;
+
 import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 
+/**
+ * Tests for the {@link MesosTaskManagerParameters}.
+ */
 public class MesosTaskManagerParametersTest extends TestLogger {
 
 	@Test
@@ -56,12 +62,12 @@ public class MesosTaskManagerParametersTest extends TestLogger {
 		assertEquals(0, MesosTaskManagerParameters.buildVolumes(Option.<String>apply("")).size());
 	}
 
-	@Test(expected=IllegalArgumentException.class)
+	@Test(expected = IllegalArgumentException.class)
 	public void testBuildVolumesBadMode() throws Exception {
 		MesosTaskManagerParameters.buildVolumes(Option.<String>apply("/hp:/cp:RF"));
 	}
 
-	@Test(expected=IllegalArgumentException.class)
+	@Test(expected = IllegalArgumentException.class)
 	public void testBuildVolumesMalformed() throws Exception {
 		MesosTaskManagerParameters.buildVolumes(Option.<String>apply("/hp:/cp:ro:extra"));
 	}
@@ -77,66 +83,65 @@ public class MesosTaskManagerParametersTest extends TestLogger {
 		assertEquals("/host/path", params.containerVolumes().get(0).getHostPath());
 		assertEquals(Protos.Volume.Mode.RO, params.containerVolumes().get(0).getMode());
 	}
-	
+
+	@Test
+	public void givenTwoConstraintsInConfigShouldBeParsed() throws Exception {
+
+		MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration("cluster:foo,az:eu-west-1"));
+		assertThat(mesosTaskManagerParameters.constraints().size(), is(2));
+		ConstraintEvaluator firstConstraintEvaluator = new HostAttrValueConstraint("cluster", new Func1<String, String>() {
+			@Override
+			public String call(String s) {
+				return "foo";
+			}
+		});
+		ConstraintEvaluator secondConstraintEvaluator = new HostAttrValueConstraint("az", new Func1<String, String>() {
+			@Override
+			public String call(String s) {
+				return "foo";
+			}
+		});
+		assertThat(mesosTaskManagerParameters.constraints().get(0).getName(), is(firstConstraintEvaluator.getName()));
+		assertThat(mesosTaskManagerParameters.constraints().get(1).getName(), is(secondConstraintEvaluator.getName()));
+
+	}
+
+	@Test
+	public void givenOneConstraintInConfigShouldBeParsed() throws Exception {
+
+		MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration("cluster:foo"));
+		assertThat(mesosTaskManagerParameters.constraints().size(), is(1));
+		ConstraintEvaluator firstConstraintEvaluator = new HostAttrValueConstraint("cluster", new Func1<String, String>() {
+			@Override
+			public String call(String s) {
+				return "foo";
+			}
+		});
+		assertThat(mesosTaskManagerParameters.constraints().get(0).getName(), is(firstConstraintEvaluator.getName()));
+	}
+
 	@Test
-    public void givenTwoConstraintsInConfigShouldBeParsed() throws Exception {
-
-        MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration("cluster:foo,az:eu-west-1"));
-        assertThat(mesosTaskManagerParameters.constraints().size(), is(2));
-        ConstraintEvaluator firstConstraintEvaluator = new HostAttrValueConstraint("cluster", new Func1<String, String>() {
-            @Override
-            public String call(String s) {
-                return "foo";
-            }
-        });
-        ConstraintEvaluator secondConstraintEvaluator = new HostAttrValueConstraint("az", new Func1<String, String>() {
-            @Override
-            public String call(String s) {
-                return "foo";
-            }
-        });
-        assertThat(mesosTaskManagerParameters.constraints().get(0).getName(), is(firstConstraintEvaluator.getName()));
-        assertThat(mesosTaskManagerParameters.constraints().get(1).getName(), is(secondConstraintEvaluator.getName()));
-
-    }
-
-    @Test
-    public void givenOneConstraintInConfigShouldBeParsed() throws Exception {
-
-        MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration("cluster:foo"));
-        assertThat(mesosTaskManagerParameters.constraints().size(), is(1));
-        ConstraintEvaluator firstConstraintEvaluator = new HostAttrValueConstraint("cluster", new Func1<String, String>() {
-            @Override
-            public String call(String s) {
-                return "foo";
-            }
-        });
-        assertThat(mesosTaskManagerParameters.constraints().get(0).getName(), is(firstConstraintEvaluator.getName()));
-    }
-
-    @Test
-    public void givenEmptyConstraintInConfigShouldBeParsed() throws Exception {
-
-        MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration(""));
-        assertThat(mesosTaskManagerParameters.constraints().size(), is(0));
-    }
-
-    @Test
-    public void givenInvalidConstraintInConfigShouldBeParsed() throws Exception {
-
-        MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration(",:,"));
-        assertThat(mesosTaskManagerParameters.constraints().size(), is(0));
-    }
-
-
-    private static Configuration withHardHostAttrConstraintConfiguration(final String configuration) {
-        return new Configuration() {
-            private static final long serialVersionUID = -3249384117909445760L;
-
-            {
-                setString(MesosTaskManagerParameters.MESOS_CONSTRAINTS_HARD_HOSTATTR, configuration);
-            }
-        };
-    }
+	public void givenEmptyConstraintInConfigShouldBeParsed() throws Exception {
+
+		MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration(""));
+		assertThat(mesosTaskManagerParameters.constraints().size(), is(0));
+	}
+
+	@Test
+	public void givenInvalidConstraintInConfigShouldBeParsed() throws Exception {
+
+		MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration(",:,"));
+		assertThat(mesosTaskManagerParameters.constraints().size(), is(0));
+	}
+
+	private static Configuration withHardHostAttrConstraintConfiguration(final String configuration) {
+		return new Configuration() {
+			private static final long serialVersionUID = -3249384117909445760L;
+
+			{
+				setString(MesosTaskManagerParameters.MESOS_CONSTRAINTS_HARD_HOSTATTR, configuration);
+			}
+		};
+	}
 
 }


[4/5] flink git commit: [FLINK-4497] [cassandra] Scala Case Classes / Tuple support

Posted by ch...@apache.org.
[FLINK-4497] [cassandra] Scala Case Classes / Tuple support

This closes #2633.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8dfc9f9d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8dfc9f9d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8dfc9f9d

Branch: refs/heads/master
Commit: 8dfc9f9d0f06e4ea2376b0a58efd623e58735ae5
Parents: d4fba3b
Author: zentol <ch...@apache.org>
Authored: Mon Oct 10 15:30:48 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 25 15:58:51 2017 +0200

----------------------------------------------------------------------
 .../flink-connector-cassandra/pom.xml           | 11 +++
 .../cassandra/AbstractCassandraTupleSink.java   | 52 +++++++++++
 .../cassandra/CassandraScalaProductSink.java    | 41 +++++++++
 .../connectors/cassandra/CassandraSink.java     | 91 ++++++++++++++++----
 .../connectors/cassandra/CassandraSinkBase.java | 15 ++--
 .../cassandra/CassandraTupleSink.java           | 27 +-----
 .../cassandra/CassandraConnectorITCase.java     | 55 ++++++++++++
 7 files changed, 245 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8dfc9f9d/flink-connectors/flink-connector-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/pom.xml b/flink-connectors/flink-connector-cassandra/pom.xml
index 2722c30..4a720c4 100644
--- a/flink-connectors/flink-connector-cassandra/pom.xml
+++ b/flink-connectors/flink-connector-cassandra/pom.xml
@@ -99,6 +99,12 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-scala_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
 			<groupId>com.datastax.cassandra</groupId>
 			<artifactId>cassandra-driver-core</artifactId>
 			<version>${driver.version}</version>
@@ -129,6 +135,11 @@ under the License.
 			</exclusions>
 		</dependency>
 		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-library</artifactId>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
 			<groupId>com.google.guava</groupId>
 			<artifactId>guava</artifactId>
 			<version>${guava.version}</version>

http://git-wip-us.apache.org/repos/asf/flink/blob/8dfc9f9d/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
new file mode 100644
index 0000000..7a8d097
--- /dev/null
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Abstract sink to write tuple-like values into a Cassandra cluster.
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public abstract class AbstractCassandraTupleSink<IN> extends CassandraSinkBase<IN, ResultSet> {
+	private final String insertQuery;
+	private transient PreparedStatement ps;
+
+	public AbstractCassandraTupleSink(String insertQuery, ClusterBuilder builder) {
+		super(builder);
+		this.insertQuery = insertQuery;
+	}
+
+	@Override
+	public void open(Configuration configuration) {
+		super.open(configuration);
+		this.ps = session.prepare(insertQuery);
+	}
+
+	@Override
+	public ListenableFuture<ResultSet> send(IN value) {
+		Object[] fields = extract(value);
+		return session.executeAsync(ps.bind(fields));
+	}
+
+	protected abstract Object[] extract(IN record);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8dfc9f9d/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
new file mode 100644
index 0000000..a975985
--- /dev/null
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+
+import scala.Product;
+
+/**
+ * Sink to write scala tuples and case classes into a Cassandra cluster.
+ *
+ * @param <IN> Type of the elements emitted by this sink, it must extend {@link Product}
+ */
+public class CassandraScalaProductSink<IN extends Product> extends AbstractCassandraTupleSink<IN> {
+	public CassandraScalaProductSink(String insertQuery, ClusterBuilder builder) {
+		super(insertQuery, builder);
+	}
+
+	@Override
+	protected Object[] extract(IN record) {
+		Object[] al = new Object[record.productArity()];
+		for (int i = 0; i < record.productArity(); i++) {
+			al[i] = record.productElement(i);
+		}
+		return al;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8dfc9f9d/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index 9f0079f..6a33601 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -22,7 +22,9 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -30,6 +32,7 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.transformations.SinkTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import scala.Product;
 
 /**
  * This class wraps different Cassandra sink implementations to provide a common interface for all of them.
@@ -190,13 +193,31 @@ public class CassandraSink<IN> {
 	 * @param <IN>  input type
 	 * @return CassandraSinkBuilder, to further configure the sink
 	 */
+	public static <IN> CassandraSinkBuilder<IN> addSink(org.apache.flink.streaming.api.scala.DataStream<IN> input) {
+		return addSink(input.javaStream());
+	}
+
+	/**
+	 * Writes a DataStream into a Cassandra database.
+	 *
+	 * @param input input DataStream
+	 * @param <IN>  input type
+	 * @return CassandraSinkBuilder, to further configure the sink
+	 */
 	public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) {
-		if (input.getType() instanceof TupleTypeInfo) {
+		TypeInformation<IN> typeInfo = input.getType();
+		if (typeInfo instanceof TupleTypeInfo) {
 			DataStream<T> tupleInput = (DataStream<T>) input;
 			return (CassandraSinkBuilder<IN>) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig()));
-		} else {
+		}
+		if (typeInfo instanceof PojoTypeInfo) {
 			return new CassandraPojoSinkBuilder<>(input, input.getType(), input.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
 		}
+		if (typeInfo instanceof CaseClassTypeInfo) {
+			DataStream<Product> productInput = (DataStream<Product>) input;
+			return (CassandraSinkBuilder<IN>) new CassandraScalaProductSinkBuilder<>(productInput, productInput.getType(), productInput.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
+		}
+		throw new IllegalArgumentException("No support for the type of the given DataStream: " + input.getType());
 	}
 
 	public abstract static class CassandraSinkBuilder<IN> {
@@ -300,7 +321,16 @@ public class CassandraSink<IN> {
 		 * @return finalized sink
 		 * @throws Exception
 		 */
-		public abstract CassandraSink<IN> build() throws Exception;
+		public CassandraSink<IN> build() throws Exception {
+			sanityCheck();
+			return isWriteAheadLogEnabled
+				? createWriteAheadSink()
+				: createSink();
+		}
+		
+		protected abstract CassandraSink<IN> createSink() throws Exception;
+
+		protected abstract CassandraSink<IN> createWriteAheadSink() throws Exception;
 
 		protected void sanityCheck() {
 			if (builder == null) {
@@ -323,15 +353,15 @@ public class CassandraSink<IN> {
 		}
 
 		@Override
-		public CassandraSink<IN> build() throws Exception {
-			sanityCheck();
-			if (isWriteAheadLogEnabled) {
-				return committer == null
-					? new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, new CassandraCommitter(builder))))
-					: new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, committer)));
-			} else {
-				return new CassandraSink<>(input.addSink(new CassandraTupleSink<IN>(query, builder)).name("Cassandra Sink"));
-			}
+		public CassandraSink<IN> createSink() throws Exception {
+			return new CassandraSink<>(input.addSink(new CassandraTupleSink<IN>(query, builder)).name("Cassandra Sink"));
+		}
+
+		@Override
+		protected CassandraSink<IN> createWriteAheadSink() throws Exception {
+			return committer == null
+				? new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, new CassandraCommitter(builder))))
+				: new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, committer)));
 		}
 	}
 
@@ -349,13 +379,38 @@ public class CassandraSink<IN> {
 		}
 
 		@Override
-		public CassandraSink<IN> build() throws Exception {
-			sanityCheck();
-			if (isWriteAheadLogEnabled) {
-				throw new IllegalArgumentException("Exactly-once guarantees can only be provided for tuple types.");
-			} else {
-				return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder)).name("Cassandra Sink"));
+		public CassandraSink<IN> createSink() throws Exception {
+			return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder)).name("Cassandra Sink"));
+		}
+
+		@Override
+		protected CassandraSink<IN> createWriteAheadSink() throws Exception {
+			throw new IllegalArgumentException("Exactly-once guarantees can only be provided for tuple types.");
+		}
+	}
+
+	public static class CassandraScalaProductSinkBuilder<IN extends Product> extends CassandraSinkBuilder<IN> {
+
+		public CassandraScalaProductSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
+			super(input, typeInfo, serializer);
+		}
+
+		@Override
+		protected void sanityCheck() {
+			super.sanityCheck();
+			if (query == null || query.length() == 0) {
+				throw new IllegalArgumentException("Query must not be null or empty.");
 			}
 		}
+
+		@Override
+		public CassandraSink<IN> createSink() throws Exception {
+			return new CassandraSink<>(input.addSink(new CassandraScalaProductSink<IN>(query, builder)).name("Cassandra Sink"));
+		}
+
+		@Override
+		protected CassandraSink<IN> createWriteAheadSink() throws Exception {
+			throw new IllegalArgumentException("Exactly-once guarantees can only be provided for flink tuple types.");
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8dfc9f9d/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index b281525..b1b261e 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * @param <IN> Type of the elements emitted by this sink
  */
 public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
-	protected static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBase.class);
+	protected final Logger log = LoggerFactory.getLogger(getClass());
 	protected transient Cluster cluster;
 	protected transient Session session;
 
@@ -48,7 +48,7 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 
 	private final AtomicInteger updatesPending = new AtomicInteger();
 
-	protected CassandraSinkBase(ClusterBuilder builder) {
+	CassandraSinkBase(ClusterBuilder builder) {
 		this.builder = builder;
 		ClosureCleaner.clean(builder, true);
 	}
@@ -76,7 +76,7 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 				}
 				exception = t;
 				
-				LOG.error("Error while sending value.", t);
+				log.error("Error while sending value.", t);
 			}
 		};
 		this.cluster = builder.getCluster();
@@ -107,21 +107,24 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 					updatesPending.wait();
 				}
 			}
-			
+
+			if (exception != null) {
+				throw new IOException("Error while sending value.", exception);
+			}
 		} finally {
 			try {
 				if (session != null) {
 					session.close();
 				}
 			} catch (Exception e) {
-				LOG.error("Error while closing session.", e);
+				log.error("Error while closing session.", e);
 			}
 			try {
 				if (cluster != null) {
 					cluster.close();
 				}
 			} catch (Exception e) {
-				LOG.error("Error while closing cluster.", e);
+				log.error("Error while closing cluster.", e);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8dfc9f9d/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
index 0a9ef06..a7ec1df 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
@@ -17,39 +17,20 @@
 
 package org.apache.flink.streaming.connectors.cassandra;
 
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
 
 /**
- * Flink Sink to save data into a Cassandra cluster.
+ * Sink to write Flink {@link Tuple}s into a Cassandra cluster.
  *
  * @param <IN> Type of the elements emitted by this sink, it must extend {@link Tuple}
  */
-public class CassandraTupleSink<IN extends Tuple> extends CassandraSinkBase<IN, ResultSet> {
-	private final String insertQuery;
-	private transient PreparedStatement ps;
-
+public class CassandraTupleSink<IN extends Tuple> extends AbstractCassandraTupleSink<IN> {
 	public CassandraTupleSink(String insertQuery, ClusterBuilder builder) {
-		super(builder);
-		this.insertQuery = insertQuery;
-	}
-
-	@Override
-	public void open(Configuration configuration) {
-		super.open(configuration);
-		this.ps = session.prepare(insertQuery);
+		super(insertQuery, builder);
 	}
 
 	@Override
-	public ListenableFuture<ResultSet> send(IN value) {
-		Object[] fields = extract(value);
-		return session.executeAsync(ps.bind(fields));
-	}
-
-	private Object[] extract(IN record) {
+	protected Object[] extract(IN record) {
 		Object[] al = new Object[record.getArity()];
 		for (int i = 0; i < record.getArity(); i++) {
 			al[i] = record.getField(i);

http://git-wip-us.apache.org/repos/asf/flink/blob/8dfc9f9d/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index 06f3c35..95cd86c 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -31,14 +31,20 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
 import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
 import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
 
 import org.junit.AfterClass;
@@ -49,6 +55,8 @@ import org.junit.Test;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
 
 import java.io.BufferedWriter;
 import java.io.File;
@@ -420,4 +428,51 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 	private String injectTableName(String target) {
 		return target.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID);
 	}
+
+	@Test
+	public void testCassandraScalaTupleAtLeastOnceSinkBuilderDetection() throws Exception {
+		Class<scala.Tuple1<String>> c = (Class<scala.Tuple1<String>>) new scala.Tuple1<>("hello").getClass();
+		Seq<TypeInformation<?>> typeInfos = JavaConverters.asScalaBufferConverter(
+			Collections.<TypeInformation<?>>singletonList(BasicTypeInfo.STRING_TYPE_INFO)).asScala();
+		Seq<String> fieldNames = JavaConverters.asScalaBufferConverter(
+			Collections.singletonList("_1")).asScala();
+
+		CaseClassTypeInfo<scala.Tuple1<String>> typeInfo = new CaseClassTypeInfo<scala.Tuple1<String>>(c, null, typeInfos, fieldNames) {
+			@Override
+			public TypeSerializer<scala.Tuple1<String>> createSerializer(ExecutionConfig config) {
+				return null;
+			}
+		};
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		DataStream<scala.Tuple1<String>> input = env.fromElements(new scala.Tuple1<>("hello"));
+
+		CassandraSink.CassandraSinkBuilder<scala.Tuple1<String>> sinkBuilder = CassandraSink.addSink(input);
+		assertTrue(sinkBuilder instanceof CassandraSink.CassandraScalaProductSinkBuilder);
+	}
+
+	@Test
+	public void testCassandraScalaTupleAtLeastSink() throws Exception {
+		CassandraScalaProductSink<scala.Tuple3<String, Integer, Integer>> sink = new CassandraScalaProductSink<>(INSERT_DATA_QUERY, builder);
+
+		List<scala.Tuple3<String, Integer, Integer>> scalaTupleCollection = new ArrayList<>(20);
+		for (int i = 0; i < 20; i++) {
+			scalaTupleCollection.add(new scala.Tuple3<>(UUID.randomUUID().toString(), i, 0));
+		}
+
+		sink.open(new Configuration());
+		for (scala.Tuple3<String, Integer, Integer> value : scalaTupleCollection) {
+			sink.invoke(value);
+		}
+		sink.close();
+
+		ResultSet rs = session.execute(SELECT_DATA_QUERY);
+		List<Row> rows = rs.all();
+		assertEquals(scalaTupleCollection.size(), rows.size());
+
+		for (Row row : rows) {
+			scalaTupleCollection.remove(new scala.Tuple3<>(row.getString("id"), row.getInt("counter"), row.getInt("batch_id")));
+		}
+		assertEquals(0, scalaTupleCollection.size());
+	}
 }


[2/5] flink git commit: [FLINK-6715] Activate strict checkstyle for flink-mesos

Posted by ch...@apache.org.
[FLINK-6715] Activate strict checkstyle for flink-mesos

This closes #3988.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bca76ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bca76ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bca76ed

Branch: refs/heads/master
Commit: 0bca76ede8b7447014a7d7ed17633d77ecfafe18
Parents: 0e69dd5
Author: zentol <ch...@apache.org>
Authored: Thu May 25 10:25:24 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 25 15:56:26 2017 +0200

----------------------------------------------------------------------
 flink-mesos/pom.xml                             |  35 +++++
 .../main/java/org/apache/flink/mesos/Utils.java |   9 +-
 .../flink/mesos/cli/FlinkMesosSessionCli.java   |  15 +-
 .../clusterframework/LaunchableMesosWorker.java |  35 ++---
 .../MesosApplicationMasterRunner.java           |  65 +++++----
 .../clusterframework/MesosConfigKeys.java       |  17 +--
 .../MesosFlinkResourceManager.java              | 101 +++++++-------
 .../MesosTaskManagerParameters.java             |  71 +++++-----
 .../MesosTaskManagerRunner.java                 |  19 ++-
 .../services/MesosServicesUtils.java            |   3 +
 .../services/StandaloneMesosServices.java       |   3 +-
 .../store/MesosWorkerStore.java                 |  13 +-
 .../store/StandaloneMesosWorkerStore.java       |   3 +-
 .../store/ZooKeeperMesosWorkerStore.java        |  16 ++-
 .../flink/mesos/scheduler/SchedulerProxy.java   |  10 +-
 .../mesos/scheduler/TaskSchedulerBuilder.java   |   2 +-
 .../scheduler/messages/ResourceOffers.java      |   1 +
 .../flink/mesos/util/MesosArtifactResolver.java |   3 +-
 .../flink/mesos/util/MesosArtifactServer.java   |  50 ++++---
 .../flink/mesos/util/MesosConfiguration.java    |  29 ++--
 .../apache/flink/mesos/util/ZooKeeperUtils.java |   6 +-
 .../MesosFlinkResourceManagerTest.java          | 122 +++++++++-------
 .../MesosTaskManagerParametersTest.java         | 139 ++++++++++---------
 23 files changed, 427 insertions(+), 340 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/pom.xml
----------------------------------------------------------------------
diff --git a/flink-mesos/pom.xml b/flink-mesos/pom.xml
index 4dae731..94187ee 100644
--- a/flink-mesos/pom.xml
+++ b/flink-mesos/pom.xml
@@ -300,6 +300,41 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-checkstyle-plugin</artifactId>
+				<version>2.17</version>
+				<dependencies>
+					<dependency>
+						<groupId>com.puppycrawl.tools</groupId>
+						<artifactId>checkstyle</artifactId>
+						<version>6.19</version>
+					</dependency>
+				</dependencies>
+				<configuration>
+					<configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+					<suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<logViolationsToConsole>true</logViolationsToConsole>
+					<failOnViolation>true</failOnViolation>
+				</configuration>
+				<executions>
+					<!--
+					Execute checkstyle after compilation but before tests.
+
+					This ensures that any parsing or type checking errors are from
+					javac, so they look as expected. Beyond that, we want to
+					fail as early as possible.
+					-->
+					<execution>
+						<phase>test-compile</phase>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
 		</plugins>
 	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
index 7787e40..308e093 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
@@ -20,12 +20,17 @@ package org.apache.flink.mesos;
 
 import org.apache.flink.mesos.util.MesosArtifactResolver;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+
 import org.apache.mesos.Protos;
-import scala.Option;
 
 import java.net.URL;
 import java.util.Arrays;
 
+import scala.Option;
+
+/**
+ * Collection of utility methods.
+ */
 public class Utils {
 	/**
 	 * Construct a Mesos environment variable.
@@ -53,7 +58,7 @@ public class Utils {
 	 */
 	public static Protos.CommandInfo.URI uri(MesosArtifactResolver resolver, ContainerSpecification.Artifact artifact) {
 		Option<URL> url = resolver.resolve(artifact.dest);
-		if(url.isEmpty()) {
+		if (url.isEmpty()) {
 			throw new IllegalArgumentException("Unresolvable artifact: " + artifact.dest);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java b/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
index dcce0b8..f6850d2 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.mesos.cli;
 
+import org.apache.flink.configuration.Configuration;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.configuration.Configuration;
 
 import java.io.IOException;
 import java.util.Map;
@@ -35,28 +36,30 @@ public class FlinkMesosSessionCli {
 
 	/**
 	 * Decode encoded dynamic properties.
+	 *
 	 * @param dynamicPropertiesEncoded encoded properties produced by the encoding method.
 	 * @return a configuration instance to be merged with the static configuration.
 	 */
 	public static Configuration decodeDynamicProperties(String dynamicPropertiesEncoded) {
 		try {
 			Configuration configuration = new Configuration();
-			if(dynamicPropertiesEncoded != null) {
-				TypeReference<Map<String, String>> typeRef = new TypeReference<Map<String, String>>() {};
-				Map<String,String> props = mapper.readValue(dynamicPropertiesEncoded, typeRef);
+			if (dynamicPropertiesEncoded != null) {
+				TypeReference<Map<String, String>> typeRef = new TypeReference<Map<String, String>>() {
+				};
+				Map<String, String> props = mapper.readValue(dynamicPropertiesEncoded, typeRef);
 				for (Map.Entry<String, String> property : props.entrySet()) {
 					configuration.setString(property.getKey(), property.getValue());
 				}
 			}
 			return configuration;
-		}
-		catch(IOException ex) {
+		} catch (IOException ex) {
 			throw new IllegalArgumentException("unreadable encoded properties", ex);
 		}
 	}
 
 	/**
 	 * Encode dynamic properties as a string to be transported as an environment variable.
+	 *
 	 * @param configuration the dynamic properties to encode.
 	 * @return a string to be decoded later.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 04a406f..ce7bb9d 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -18,23 +18,23 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-import com.netflix.fenzo.ConstraintEvaluator;
-import com.netflix.fenzo.TaskAssignmentResult;
-import com.netflix.fenzo.TaskRequest;
-import com.netflix.fenzo.VMTaskFitnessCalculator;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.mesos.Utils;
 import org.apache.flink.mesos.scheduler.LaunchableTask;
 import org.apache.flink.mesos.util.MesosArtifactResolver;
 import org.apache.flink.mesos.util.MesosConfiguration;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.util.Preconditions;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.VMTaskFitnessCalculator;
 import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 
 import java.util.Collections;
 import java.util.List;
@@ -42,15 +42,17 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 
-import static org.apache.flink.mesos.Utils.variable;
+import scala.Option;
+
 import static org.apache.flink.mesos.Utils.range;
 import static org.apache.flink.mesos.Utils.ranges;
 import static org.apache.flink.mesos.Utils.scalar;
+import static org.apache.flink.mesos.Utils.variable;
 
 /**
  * Implements the launch of a Mesos worker.
  *
- * Translates the abstract {@link ContainerSpecification} into a concrete
+ * <p>Translates the abstract {@link ContainerSpecification} into a concrete
  * Mesos-specific {@link Protos.TaskInfo}.
  */
 public class LaunchableMesosWorker implements LaunchableTask {
@@ -59,9 +61,9 @@ public class LaunchableMesosWorker implements LaunchableTask {
 	/**
 	 * The set of configuration keys to be dynamically configured with a port allocated from Mesos.
 	 */
-	private static String[] TM_PORT_KEYS = {
+	private static final String[] TM_PORT_KEYS = {
 		"taskmanager.rpc.port",
-		"taskmanager.data.port" };
+		"taskmanager.data.port"};
 
 	private final MesosArtifactResolver resolver;
 	private final ContainerSpecification containerSpec;
@@ -88,7 +90,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		this.params = Preconditions.checkNotNull(params);
 		this.taskID = Preconditions.checkNotNull(taskID);
 		this.mesosConfiguration = Preconditions.checkNotNull(mesosConfiguration);
-		
+
 		this.taskRequest = new Request();
 	}
 
@@ -204,7 +206,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		//configure task manager hostname property if hostname override property is supplied
 		Option<String> taskManagerHostnameOption = params.getTaskManagerHostname();
 
-		if(taskManagerHostnameOption.isDefined()) {
+		if (taskManagerHostnameOption.isDefined()) {
 			// replace the TASK_ID pattern by the actual task id value of the Mesos task
 			final String taskManagerHostname = MesosTaskManagerParameters.TASK_ID_PATTERN
 				.matcher(taskManagerHostnameOption.get())
@@ -225,7 +227,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		}
 
 		// ship additional files
-		for(ContainerSpecification.Artifact artifact : containerSpec.getArtifacts()) {
+		for (ContainerSpecification.Artifact artifact : containerSpec.getArtifacts()) {
 			cmd.addUris(Utils.uri(resolver, artifact));
 		}
 
@@ -271,9 +273,9 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		// in event that no docker image or mesos image name is specified, we must still
 		// set type to MESOS
 		containerInfo.setType(Protos.ContainerInfo.Type.MESOS);
-		switch(params.containerType()) {
+		switch (params.containerType()) {
 			case MESOS:
-				if(params.containerImageName().isDefined()) {
+				if (params.containerImageName().isDefined()) {
 					containerInfo
 						.setMesos(Protos.ContainerInfo.MesosInfo.newBuilder()
 							.setImage(Protos.Image.newBuilder()
@@ -285,7 +287,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
 
 			case DOCKER:
 				assert(params.containerImageName().isDefined());
-					containerInfo
+				containerInfo
 					.setType(Protos.ContainerInfo.Type.DOCKER)
 					.setDocker(Protos.ContainerInfo.DockerInfo.newBuilder()
 						.setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST)
@@ -300,7 +302,6 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		containerInfo.addAllVolumes(params.containerVolumes());
 		taskInfo.setContainer(containerInfo);
 
-
 		return taskInfo.build();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 09ef380..fc75bd7 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -18,16 +18,6 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Address;
-import akka.actor.Props;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -62,15 +52,19 @@ import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
-import org.apache.mesos.Protos;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.Props;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.Option;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URL;
@@ -82,6 +76,10 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -89,24 +87,26 @@ import static org.apache.flink.util.Preconditions.checkState;
  * It starts actor system and the actors for {@link JobManager}
  * and {@link MesosFlinkResourceManager}.
  *
- * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container
+ * <p>The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container
  * allocation and failure detection.
  */
 public class MesosApplicationMasterRunner {
-	/** Logger */
+
 	protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
 
-	/** The maximum time that TaskManagers may be waiting to register at the JobManager,
-	 * before they quit */
+	/**
+	 * The maximum time that TaskManagers may be waiting to register at the JobManager,
+	 * before they quit.
+	 */
 	private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
 
-	/** The process environment variables */
+	/** The process environment variables. */
 	private static final Map<String, String> ENV = System.getenv();
 
-	/** The exit code returned if the initialization of the application master failed */
+	/** The exit code returned if the initialization of the application master failed. */
 	private static final int INIT_ERROR_EXIT_CODE = 31;
 
-	/** The exit code returned if the process exits because a critical actor died */
+	/** The exit code returned if the process exits because a critical actor died. */
 	private static final int ACTOR_DIED_EXIT_CODE = 32;
 
 	// ------------------------------------------------------------------------
@@ -142,7 +142,7 @@ public class MesosApplicationMasterRunner {
 
 	/**
 	 * The instance entry point for the Mesos AppMaster. Obtains user group
-	 * information and calls the main work method {@link #runPrivileged(Configuration,Configuration)} as a
+	 * information and calls the main work method {@link #runPrivileged(Configuration, Configuration)} as a
 	 * privileged action.
 	 *
 	 * @param args The command line arguments.
@@ -318,7 +318,6 @@ public class MesosApplicationMasterRunner {
 				getJobManagerClass(),
 				getArchivistClass())._1();
 
-
 			// 2: the web monitor
 			LOG.debug("Starting Web Frontend");
 
@@ -328,7 +327,7 @@ public class MesosApplicationMasterRunner {
 				actorSystem,
 				jobManager,
 				LOG);
-			if(webMonitor != null) {
+			if (webMonitor != null) {
 				final URL webMonitorURL = new URL("http", appMasterHostname, webMonitor.getServerPort(), "/");
 				mesosConfig.frameworkInfo().setWebuiUrl(webMonitorURL.toExternalForm());
 			}
@@ -380,7 +379,7 @@ public class MesosApplicationMasterRunner {
 				}
 			}
 
-			if(artifactServer != null) {
+			if (artifactServer != null) {
 				try {
 					artifactServer.stop();
 				} catch (Throwable ignored) {
@@ -396,7 +395,7 @@ public class MesosApplicationMasterRunner {
 				}
 			}
 
-			if(futureExecutor != null) {
+			if (futureExecutor != null) {
 				try {
 					futureExecutor.shutdownNow();
 				} catch (Throwable tt) {
@@ -404,7 +403,7 @@ public class MesosApplicationMasterRunner {
 				}
 			}
 
-			if(ioExecutor != null) {
+			if (ioExecutor != null) {
 				try {
 					ioExecutor.shutdownNow();
 				} catch (Throwable tt) {
@@ -493,7 +492,7 @@ public class MesosApplicationMasterRunner {
 			.setHostname(hostname);
 		Protos.Credential.Builder credential = null;
 
-		if(!flinkConfig.containsKey(ConfigConstants.MESOS_MASTER_URL)) {
+		if (!flinkConfig.containsKey(ConfigConstants.MESOS_MASTER_URL)) {
 			throw new IllegalConfigurationException(ConfigConstants.MESOS_MASTER_URL + " must be configured.");
 		}
 		String masterUrl = flinkConfig.getString(ConfigConstants.MESOS_MASTER_URL, null);
@@ -517,7 +516,7 @@ public class MesosApplicationMasterRunner {
 			ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_USER,
 			ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER));
 
-		if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
+		if (flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
 			frameworkInfo.setPrincipal(flinkConfig.getString(
 				ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL, null));
 
@@ -526,7 +525,7 @@ public class MesosApplicationMasterRunner {
 
 			// some environments use a side-channel to communicate the secret to Mesos,
 			// and thus don't set the 'secret' configuration setting
-			if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) {
+			if (flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) {
 				credential.setSecret(flinkConfig.getString(
 					ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null));
 			}
@@ -545,7 +544,7 @@ public class MesosApplicationMasterRunner {
 	 * needs (such as JAR file, config file, ...) and all environment variables into a container specification.
 	 * The Mesos fetcher then ensures that those artifacts will be copied into the task's sandbox directory.
 	 * A lightweight HTTP server serves the artifacts to the fetcher.
-     */
+	 */
 	private static void applyOverlays(
 		Configuration globalConfiguration, ContainerSpecification containerSpec) throws IOException {
 
@@ -565,7 +564,7 @@ public class MesosApplicationMasterRunner {
 
 	private static void configureArtifactServer(MesosArtifactServer server, ContainerSpecification container) throws IOException {
 		// serve the artifacts associated with the container environment
-		for(ContainerSpecification.Artifact artifact : container.getArtifacts()) {
+		for (ContainerSpecification.Artifact artifact : container.getArtifacts()) {
 			server.addPath(artifact.source, artifact.dest);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
index 35da95f..1af94e2 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
@@ -27,30 +27,31 @@ public class MesosConfigKeys {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * The Mesos task ID, used by the TM for informational purposes
+	 * The Mesos task ID, used by the TM for informational purposes.
 	 */
 	public static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
 
 	/**
-	 * Reserved for future enhancement
+	 * Reserved for future enhancement.
 	 */
 	public static final String ENV_FLINK_TMP_DIR = "_FLINK_TMP_DIR";
 
 	/**
-	 * JVM arguments, used by the JM and TM
+	 * JVM arguments, used by the JM and TM.
 	 */
 	public static final String ENV_JVM_ARGS = "JVM_ARGS";
 
 	/**
-	 * Standard environment variables used in DCOS environment
+	 * Standard environment variables used in DCOS environment.
 	 */
 	public static final String ENV_TASK_NAME = "TASK_NAME";
 
 	/**
- 	 * Standard environment variables used in DCOS environment
- 	 */
+	 * Standard environment variables used in DCOS environment.
+	 */
 	public static final String ENV_FRAMEWORK_NAME = "FRAMEWORK_NAME";
 
-	/** Private constructor to prevent instantiation */
-	private MesosConfigKeys() {}
+	/** Private constructor to prevent instantiation. */
+	private MesosConfigKeys() {
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
index 17ffef7..6c708fa 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
@@ -18,20 +18,14 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import com.netflix.fenzo.TaskRequest;
-import com.netflix.fenzo.TaskScheduler;
-import com.netflix.fenzo.VirtualMachineLease;
-import com.netflix.fenzo.functions.Action1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
-import org.apache.flink.mesos.scheduler.LaunchableTask;
 import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
 import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
 import org.apache.flink.mesos.scheduler.SchedulerProxy;
 import org.apache.flink.mesos.scheduler.TaskMonitor;
@@ -48,17 +42,23 @@ import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
 import org.apache.flink.mesos.util.MesosArtifactResolver;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
 import org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.FrameworkInfo;
 import org.apache.mesos.SchedulerDriver;
 import org.slf4j.Logger;
-import scala.Option;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -66,6 +66,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import scala.Option;
+
 import static java.util.Objects.requireNonNull;
 
 /**
@@ -73,25 +75,25 @@ import static java.util.Objects.requireNonNull;
  */
 public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMesosWorkerNode> {
 
-	/** The Mesos configuration (master and framework info) */
+	/** The Mesos configuration (master and framework info). */
 	private final MesosConfiguration mesosConfig;
 
-	/** The TaskManager container parameters (like container memory size) */
+	/** The TaskManager container parameters (like container memory size). */
 	private final MesosTaskManagerParameters taskManagerParameters;
 
-	/** Container specification for launching a TM */
+	/** Container specification for launching a TM. */
 	private final ContainerSpecification taskManagerContainerSpec;
 
-	/** Resolver for HTTP artifacts **/
+	/** Resolver for HTTP artifacts. **/
 	private final MesosArtifactResolver artifactResolver;
 
 	/** Number of failed Mesos tasks before stopping the application. -1 means infinite. */
 	private final int maxFailedTasks;
 
-	/** Callback handler for the asynchronous Mesos scheduler */
+	/** Callback handler for the asynchronous Mesos scheduler. */
 	private SchedulerProxy schedulerCallbackHandler;
 
-	/** Mesos scheduler driver */
+	/** Mesos scheduler driver. */
 	private SchedulerDriver schedulerDriver;
 
 	private ActorRef connectionMonitor;
@@ -104,12 +106,12 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 
 	private final MesosWorkerStore workerStore;
 
-	/** planning state related to workers - package private for unit test purposes */
+	/** planning state related to workers - package private for unit test purposes. */
 	final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
 	final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
 	final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
 
-	/** The number of failed tasks since the master became active */
+	/** The number of failed tasks since the master became active. */
 	private int failedTasksSoFar;
 
 	public MesosFlinkResourceManager(
@@ -158,7 +160,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 			.setCheckpoint(true);
 
 		Option<Protos.FrameworkID> frameworkID = workerStore.getFrameworkID();
-		if(frameworkID.isEmpty()) {
+		if (frameworkID.isEmpty()) {
 			LOG.info("Registering as new framework.");
 		}
 		else {
@@ -248,7 +250,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 			TaskMonitor.TaskTerminated msg = (TaskMonitor.TaskTerminated) message;
 			taskTerminated(msg.taskID(), msg.status());
 
-		} else  {
+		} else {
 			// message handled by the generic resource master code
 			super.handleMessage(message);
 		}
@@ -267,15 +269,13 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 		try {
 			// unregister the framework, which implicitly removes all tasks.
 			schedulerDriver.stop(false);
-		}
-		catch(Exception ex) {
+		} catch (Exception ex) {
 			LOG.warn("unable to unregister the framework", ex);
 		}
 
 		try {
 			workerStore.stop(true);
-		}
-		catch(Exception ex) {
+		} catch (Exception ex) {
 			LOG.warn("unable to stop the worker state store", ex);
 		}
 
@@ -308,13 +308,13 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 		if (!tasksFromPreviousAttempts.isEmpty()) {
 			LOG.info("Retrieved {} TaskManagers from previous attempt", tasksFromPreviousAttempts.size());
 
-			List<Tuple2<TaskRequest,String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size());
+			List<Tuple2<TaskRequest, String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size());
 			List<LaunchableTask> toLaunch = new ArrayList<>(tasksFromPreviousAttempts.size());
 
 			for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
 				LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID());
 
-				switch(worker.state()) {
+				switch (worker.state()) {
 					case New:
 						workersInNew.put(extractResourceID(worker.taskID()), worker);
 						toLaunch.add(launchable);
@@ -331,11 +331,11 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 			}
 
 			// tell the launch coordinator about prior assignments
-			if(toAssign.size() >= 1) {
+			if (toAssign.size() >= 1) {
 				launchCoordinator.tell(new LaunchCoordinator.Assign(toAssign), self());
 			}
 			// tell the launch coordinator to launch any new tasks
-			if(toLaunch.size() >= 1) {
+			if (toLaunch.size() >= 1) {
 				launchCoordinator.tell(new LaunchCoordinator.Launch(toLaunch), self());
 			}
 		}
@@ -374,11 +374,10 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 			}
 
 			// tell the launch coordinator to launch the new tasks
-			if(toLaunch.size() >= 1) {
+			if (toLaunch.size() >= 1) {
 				launchCoordinator.tell(new LaunchCoordinator.Launch(toLaunch), self());
 			}
-		}
-		catch(Exception ex) {
+		} catch (Exception ex) {
 			fatalError("unable to request new workers", ex);
 		}
 	}
@@ -386,7 +385,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 	/**
 	 * Accept offers as advised by the launch coordinator.
 	 *
-	 * Acceptance is routed through the RM to update the persistent state before
+	 * <p>Acceptance is routed through the RM to update the persistent state before
 	 * forwarding the message to Mesos.
 	 */
 	private void acceptOffers(AcceptOffers msg) {
@@ -421,15 +420,14 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 
 			// send the acceptance message to Mesos
 			schedulerDriver.acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
-		}
-		catch(Exception ex) {
+		} catch (Exception ex) {
 			fatalError("unable to accept offers", ex);
 		}
 	}
 
 	/**
 	 * Handle a task status change.
-     */
+	 */
 	private void taskStatusUpdated(StatusUpdate message) {
 		taskRouter.tell(message, self());
 		reconciliationCoordinator.tell(message, self());
@@ -470,9 +468,8 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 			if (worker != null) {
 				LOG.info("Mesos worker consolidation recognizes TaskManager {}.", resourceID);
 				accepted.add(new RegisteredMesosWorkerNode(worker));
-			}
-			else {
-				if(isStarted(resourceID)) {
+			} else {
+				if (isStarted(resourceID)) {
 					LOG.info("TaskManager {} has already been registered at the resource manager.", resourceID);
 				}
 				else {
@@ -549,8 +546,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 
 		try {
 			workerStore.setFrameworkID(Option.apply(message.frameworkId()));
-		}
-		catch(Exception ex) {
+		} catch (Exception ex) {
 			fatalError("unable to store the assigned framework ID", ex);
 			return;
 		}
@@ -598,13 +594,12 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 		boolean existed;
 		try {
 			existed = workerStore.removeWorker(taskID);
-		}
-		catch(Exception ex) {
+		} catch (Exception ex) {
 			fatalError("unable to remove worker", ex);
 			return;
 		}
 
-		if(!existed) {
+		if (!existed) {
 			LOG.info("Received a termination notice for an unrecognized worker: {}", id);
 			return;
 		}
@@ -695,11 +690,15 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 	 * @return goal state information for the {@Link TaskMonitor}.
 	 */
 	static TaskMonitor.TaskGoalState extractGoalState(MesosWorkerStore.Worker worker) {
-		switch(worker.state()) {
-			case New: return new TaskMonitor.New(worker.taskID());
-			case Launched: return new TaskMonitor.Launched(worker.taskID(), worker.slaveID().get());
-			case Released: return new TaskMonitor.Released(worker.taskID(), worker.slaveID().get());
-			default: throw new IllegalArgumentException("unsupported worker state");
+		switch (worker.state()) {
+			case New:
+				return new TaskMonitor.New(worker.taskID());
+			case Launched:
+				return new TaskMonitor.Launched(worker.taskID(), worker.slaveID().get());
+			case Released:
+				return new TaskMonitor.Released(worker.taskID(), worker.slaveID().get());
+			default:
+				throw new IllegalArgumentException("unsupported worker state");
 		}
 	}
 
@@ -727,7 +726,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 	/**
 	 * Creates the props needed to instantiate this actor.
 	 *
-	 * Rather than extracting and validating parameters in the constructor, this factory method takes
+	 * <p>Rather than extracting and validating parameters in the constructor, this factory method takes
 	 * care of that. That way, errors occur synchronously, and are not swallowed simply in a
 	 * failed asynchronous attempt to start the actor.
 
@@ -746,7 +745,8 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 	 *
 	 * @return The Props object to instantiate the MesosFlinkResourceManager actor.
 	 */
-	public static Props createActorProps(Class<? extends MesosFlinkResourceManager> actorClass,
+	public static Props createActorProps(
+			Class<? extends MesosFlinkResourceManager> actorClass,
 			Configuration flinkConfig,
 			MesosConfiguration mesosConfig,
 			MesosWorkerStore workerStore,
@@ -754,8 +754,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 			MesosTaskManagerParameters taskManagerParameters,
 			ContainerSpecification taskManagerContainerSpec,
 			MesosArtifactResolver artifactResolver,
-			Logger log)
-	{
+			Logger log) {
 
 		final int numInitialTaskManagers = flinkConfig.getInteger(
 			ConfigConstants.MESOS_INITIAL_TASKS, 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 4324469..f5a415e 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -18,34 +18,36 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-import com.netflix.fenzo.ConstraintEvaluator;
-import com.netflix.fenzo.functions.Func1;
-import com.netflix.fenzo.plugins.HostAttrValueConstraint;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.util.Preconditions;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.functions.Func1;
+import com.netflix.fenzo.plugins.HostAttrValueConstraint;
 import org.apache.mesos.Protos;
-import scala.Option;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.regex.Pattern;
 
+import scala.Option;
+
 import static org.apache.flink.configuration.ConfigOptions.key;
 
 /**
  * This class describes the Mesos-specific parameters for launching a TaskManager process.
  *
- * These parameters are in addition to the common parameters
+ * <p>These parameters are in addition to the common parameters
  * provided by {@link ContaineredTaskManagerParameters}.
  */
 public class MesosTaskManagerParameters {
 
-	/** Pattern replaced in the {@link #MESOS_TM_HOSTNAME} by the actual task id of the Mesos task */
+	/** Pattern replaced in the {@link #MESOS_TM_HOSTNAME} by the actual task id of the Mesos task. */
 	public static final Pattern TASK_ID_PATTERN = Pattern.compile("_TASK_", Pattern.LITERAL);
 
 	public static final ConfigOption<Integer> MESOS_RM_TASKS_SLOTS =
@@ -75,11 +77,11 @@ public class MesosTaskManagerParameters {
 	public static final ConfigOption<String> MESOS_TM_BOOTSTRAP_CMD =
 		key("mesos.resourcemanager.tasks.bootstrap-cmd")
 		.noDefaultValue();
-	
+
 	public static final ConfigOption<String> MESOS_RM_CONTAINER_VOLUMES =
 		key("mesos.resourcemanager.tasks.container.volumes")
 		.noDefaultValue();
-	
+
 	public static final ConfigOption<String> MESOS_CONSTRAINTS_HARD_HOSTATTR =
 		key("mesos.constraints.hard.hostattribute")
 		.noDefaultValue();
@@ -102,9 +104,9 @@ public class MesosTaskManagerParameters {
 	private final ContaineredTaskManagerParameters containeredParameters;
 
 	private final List<Protos.Volume> containerVolumes;
-	
+
 	private final List<ConstraintEvaluator> constraints;
-	
+
 	private final Option<String> bootstrapCommand;
 
 	private final Option<String> taskManagerHostname;
@@ -129,10 +131,9 @@ public class MesosTaskManagerParameters {
 		this.taskManagerHostname = Preconditions.checkNotNull(taskManagerHostname);
 	}
 
-
-    /**
+	/**
 	 * Get the CPU units to use for the TaskManager process.
-     */
+	 */
 	public double cpus() {
 		return cpus;
 	}
@@ -140,49 +141,53 @@ public class MesosTaskManagerParameters {
 	/**
 	 * Get the container type (Mesos or Docker).  The default is Mesos.
 	 *
-	 * Mesos provides a facility for a framework to specify which containerizer to use.
-     */
+	 * <p>Mesos provides a facility for a framework to specify which containerizer to use.
+	 */
 	public ContainerType containerType() {
 		return containerType;
 	}
 
 	/**
 	 * Get the container image name.
-     */
+	 */
 	public Option<String> containerImageName() {
 		return containerImageName;
 	}
 
 	/**
 	 * Get the common containered parameters.
-     */
+	 */
 	public ContaineredTaskManagerParameters containeredParameters() {
 		return containeredParameters;
 	}
 
 	/**
-	 * Get the container volumes string
+	 * Get the container volumes string.
 	 */
 	public List<Protos.Volume> containerVolumes() {
 		return containerVolumes;
 	}
 
 	/**
-	 * Get the placement constraints
+	 * Get the placement constraints.
 	 */
 	public List<ConstraintEvaluator> constraints() {
 		return constraints;
 	}
 
 	/**
- 	 * Get the taskManager hostname.
- 	 */
-	public Option<String> getTaskManagerHostname() { return taskManagerHostname; }
+	 * Get the taskManager hostname.
+	 */
+	public Option<String> getTaskManagerHostname() {
+		return taskManagerHostname;
+	}
 
 	/**
- 	 * Get the bootstrap command.
- 	 */
-	public Option<String> bootstrapCommand() { return bootstrapCommand;	}	
+	 * Get the bootstrap command.
+	 */
+	public Option<String> bootstrapCommand() {
+		return bootstrapCommand;
+	}
 
 	@Override
 	public String toString() {
@@ -200,8 +205,9 @@ public class MesosTaskManagerParameters {
 
 	/**
 	 * Create the Mesos TaskManager parameters.
+	 *
 	 * @param flinkConfig the TM configuration.
-     */
+	 */
 	public static MesosTaskManagerParameters create(Configuration flinkConfig) {
 
 		List<ConstraintEvaluator> constraints = parseConstraints(flinkConfig.getString(MESOS_CONSTRAINTS_HARD_HOSTATTR));
@@ -212,7 +218,7 @@ public class MesosTaskManagerParameters {
 			flinkConfig.getInteger(MESOS_RM_TASKS_SLOTS));
 
 		double cpus = flinkConfig.getDouble(MESOS_RM_TASKS_CPUS);
-		if(cpus <= 0.0) {
+		if (cpus <= 0.0) {
 			cpus = Math.max(containeredParameters.numSlots(), 1.0);
 		}
 
@@ -221,13 +227,13 @@ public class MesosTaskManagerParameters {
 
 		ContainerType containerType;
 		String containerTypeString = flinkConfig.getString(MESOS_RM_CONTAINER_TYPE);
-		switch(containerTypeString) {
+		switch (containerTypeString) {
 			case MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS:
 				containerType = ContainerType.MESOS;
 				break;
 			case MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER:
 				containerType = ContainerType.DOCKER;
-				if(imageName == null || imageName.length() == 0) {
+				if (imageName == null || imageName.length() == 0) {
 					throw new IllegalConfigurationException(MESOS_RM_CONTAINER_IMAGE_NAME.key() +
 						" must be specified for docker container type");
 				}
@@ -250,7 +256,7 @@ public class MesosTaskManagerParameters {
 			cpus,
 			containerType,
 			Option.apply(imageName),
-			containeredParameters,			
+			containeredParameters,
 			containerVolumes,
 			constraints,
 			tmBootstrapCommand,
@@ -287,7 +293,7 @@ public class MesosTaskManagerParameters {
 			}
 		}));
 	}
-	
+
 	/**
 	 * Used to build volume specs for mesos. This allows for mounting additional volumes into a container
 	 *
@@ -341,6 +347,9 @@ public class MesosTaskManagerParameters {
 		}
 	}
 
+	/**
+	 * The supported containerizers.
+	 */
 	public enum ContainerType {
 		MESOS,
 		DOCKER

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index 625880b..e1b0efa 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -36,14 +28,21 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
-
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
 /**
  * The entry point for running a TaskManager in a Mesos container.
  */
@@ -59,7 +58,7 @@ public class MesosTaskManagerRunner {
 				.addOption(BootstrapTools.newDynamicPropertiesOption());
 	}
 
-	/** The process environment variables */
+	/** The process environment variables. */
 	private static final Map<String, String> ENV = System.getenv();
 
 	public static void runTaskManager(String[] args, final Class<? extends TaskManager> taskManager) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
index 0ac5f4e..a28020a 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
@@ -24,6 +24,9 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.zookeeper.ZooKeeperUtilityFactory;
 import org.apache.flink.util.ConfigurationUtil;
 
+/**
+ * Utilities for the {@link MesosServices}.
+ */
 public class MesosServicesUtils {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
index dfbc2c3..aa3157f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
@@ -35,5 +35,6 @@ public class StandaloneMesosServices implements MesosServices {
 	}
 
 	@Override
-	public void close(boolean cleanup) throws Exception {}
+	public void close(boolean cleanup) throws Exception {
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
index d6ff6bc..f1f54ce 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
@@ -19,13 +19,14 @@
 package org.apache.flink.mesos.runtime.clusterframework.store;
 
 import org.apache.mesos.Protos;
-import scala.Option;
 
 import java.io.Serializable;
 import java.text.DecimalFormat;
 import java.util.List;
 import java.util.Objects;
 
+import scala.Option;
+
 import static java.util.Objects.requireNonNull;
 
 /**
@@ -83,7 +84,7 @@ public interface MesosWorkerStore {
 	/**
 	 * A stored worker.
 	 *
-	 * The assigned slaveid/hostname is valid in Launched and Released states.  The hostname is needed
+	 * <p>The assigned slaveid/hostname is valid in Launched and Released states.  The hostname is needed
 	 * by Fenzo for optimization purposes.
 	 */
 	class Worker implements Serializable {
@@ -112,28 +113,28 @@ public interface MesosWorkerStore {
 
 		/**
 		 * Get the worker's task ID.
-         */
+		 */
 		public Protos.TaskID taskID() {
 			return taskID;
 		}
 
 		/**
 		 * Get the worker's assigned slave ID.
-         */
+		 */
 		public Option<Protos.SlaveID> slaveID() {
 			return slaveID;
 		}
 
 		/**
 		 * Get the worker's assigned hostname.
-         */
+		 */
 		public Option<String> hostname() {
 			return hostname;
 		}
 
 		/**
 		 * Get the worker's state.
-         */
+		 */
 		public WorkerState state() {
 			return state;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java
index c6ccbee..b43b89c 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java
@@ -19,13 +19,14 @@
 package org.apache.flink.mesos.runtime.clusterframework.store;
 
 import org.apache.mesos.Protos;
-import scala.Option;
 
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import scala.Option;
+
 /**
  * A standalone Mesos worker store.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
index 4544b8e..92e4416 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
@@ -26,11 +26,11 @@ import org.apache.flink.runtime.zookeeper.ZooKeeperSharedValue;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.runtime.zookeeper.ZooKeeperVersionedValue;
 import org.apache.flink.util.FlinkException;
+
 import org.apache.mesos.Protos;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -38,6 +38,8 @@ import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.List;
 
+import scala.Option;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -53,13 +55,13 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 	/** Flag indicating whether this instance is running. */
 	private boolean isRunning;
 
-	/** A persistent value of the assigned framework ID */
+	/** A persistent value of the assigned framework ID. */
 	private final ZooKeeperSharedValue frameworkIdInZooKeeper;
 
-	/** A persistent count of all tasks created, for generating unique IDs */
+	/** A persistent count of all tasks created, for generating unique IDs. */
 	private final ZooKeeperSharedCount totalTaskCountInZooKeeper;
 
-	/** A persistent store of serialized workers */
+	/** A persistent store of serialized workers. */
 	private final ZooKeeperStateHandleStore<MesosWorkerStore.Worker> workersInZooKeeper;
 
 	@SuppressWarnings("unchecked")
@@ -69,7 +71,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 		ZooKeeperSharedCount totalTaskCountInZooKeeper) throws Exception {
 		this.workersInZooKeeper = checkNotNull(workersInZooKeeper, "workersInZooKeeper");
 		this.frameworkIdInZooKeeper = checkNotNull(frameworkIdInZooKeeper, "frameworkIdInZooKeeper");
-		this.totalTaskCountInZooKeeper= checkNotNull(totalTaskCountInZooKeeper, "totalTaskCountInZooKeeper");
+		this.totalTaskCountInZooKeeper = checkNotNull(totalTaskCountInZooKeeper, "totalTaskCountInZooKeeper");
 	}
 
 	@Override
@@ -89,7 +91,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 				frameworkIdInZooKeeper.close();
 				totalTaskCountInZooKeeper.close();
 
-				if(cleanup) {
+				if (cleanup) {
 					workersInZooKeeper.releaseAndTryRemoveAll();
 				}
 
@@ -237,7 +239,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 		synchronized (startStopLock) {
 			verifyIsRunning();
 
-			if(workersInZooKeeper.exists(path) == -1) {
+			if (workersInZooKeeper.exists(path) == -1) {
 				LOG.debug("No such worker {} in ZooKeeper.", taskID);
 				return false;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java
index b6d3383..0ac05c1 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.mesos.scheduler;
 
-import akka.actor.ActorRef;
-
 import org.apache.flink.mesos.scheduler.messages.Disconnected;
 import org.apache.flink.mesos.scheduler.messages.Error;
 import org.apache.flink.mesos.scheduler.messages.ExecutorLost;
@@ -30,6 +28,8 @@ import org.apache.flink.mesos.scheduler.messages.Registered;
 import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
 import org.apache.flink.mesos.scheduler.messages.SlaveLost;
 import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+
+import akka.actor.ActorRef;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Scheduler;
 import org.apache.mesos.SchedulerDriver;
@@ -39,14 +39,14 @@ import java.util.List;
 /**
  * This class reacts to callbacks from the Mesos scheduler driver.
  *
- * In order to preserve actor concurrency safety, this class simply sends
+ * <p>In order to preserve actor concurrency safety, this class simply sends
  * corresponding messages to the Mesos resource master actor.
  *
- * See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html
+ * <p>See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html
  */
 public class SchedulerProxy implements Scheduler {
 
-	/** The actor to which we report the callbacks */
+	/** The actor to which we report the callbacks. */
 	private final ActorRef mesosActor;
 
 	public SchedulerProxy(ActorRef mesosActor) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java
index c841e22..a5653e1 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java
@@ -25,7 +25,7 @@ import com.netflix.fenzo.functions.Action1;
 /**
  * A builder for the Fenzo task scheduler.
  *
- * Note that the Fenzo-provided {@link TaskScheduler.Builder} cannot be mocked, which motivates this interface.
+ * <p>Note that the Fenzo-provided {@link TaskScheduler.Builder} cannot be mocked, which motivates this interface.
  */
 public interface TaskSchedulerBuilder {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/ResourceOffers.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/ResourceOffers.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/ResourceOffers.java
index dc5111d..8ca7e42 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/ResourceOffers.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/ResourceOffers.java
@@ -22,6 +22,7 @@ import org.apache.mesos.Protos;
 
 import java.io.Serializable;
 import java.util.List;
+
 import static java.util.Objects.requireNonNull;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
index a6a26dc..c62ab84 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
@@ -19,10 +19,11 @@
 package org.apache.flink.mesos.util;
 
 import org.apache.flink.core.fs.Path;
-import scala.Option;
 
 import java.net.URL;
 
+import scala.Option;
+
 /**
  * An interface for resolving artifact URIs.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index ae826db..967d818 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -18,6 +18,14 @@
 
 package org.apache.flink.mesos.util;
 
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.net.SSLUtils;
+
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
@@ -42,25 +50,17 @@ import io.netty.handler.codec.http.LastHttpContent;
 import io.netty.handler.codec.http.router.Handler;
 import io.netty.handler.codec.http.router.Routed;
 import io.netty.handler.codec.http.router.Router;
+import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.stream.ChunkedStream;
-
 import io.netty.handler.stream.ChunkedWriteHandler;
-import io.netty.handler.ssl.SslHandler;
 import io.netty.util.CharsetUtil;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.net.SSLUtils;
 import org.jets3t.service.utils.Mimetypes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -69,6 +69,8 @@ import java.net.URL;
 import java.util.HashMap;
 import java.util.Map;
 
+import scala.Option;
+
 import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
 import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
 import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
@@ -81,11 +83,10 @@ import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
 import static io.netty.handler.codec.http.HttpResponseStatus.OK;
 import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 
-
 /**
  * A generic Mesos artifact server, designed specifically for use by the Mesos Fetcher.
  *
- * More information:
+ * <p>More information:
  * http://mesos.apache.org/documentation/latest/fetcher/
  * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/
  */
@@ -101,7 +102,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 
 	private final URL baseURL;
 
-	private final Map<Path,URL> paths = new HashMap<>();
+	private final Map<Path, URL> paths = new HashMap<>();
 
 	private final SSLContext serverSSLContext;
 
@@ -153,7 +154,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 			}
 		};
 
-		NioEventLoopGroup bossGroup   = new NioEventLoopGroup(1);
+		NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
 		NioEventLoopGroup workerGroup = new NioEventLoopGroup();
 
 		this.bootstrap = new ServerBootstrap();
@@ -169,7 +170,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 		String address = bindAddress.getAddress().getHostAddress();
 		int port = bindAddress.getPort();
 
-		String httpProtocol = (serverSSLContext != null) ? "https": "http";
+		String httpProtocol = (serverSSLContext != null) ? "https" : "http";
 
 		baseURL = new URL(httpProtocol, serverHostname, port, "/" + prefix + "/");
 
@@ -201,7 +202,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 	 * @param remoteFile the remote path with which to locate the file.
 	 * @return the fully-qualified remote path to the file.
 	 * @throws MalformedURLException if the remote path is invalid.
-     */
+	 */
 	public synchronized URL addFile(File localFile, String remoteFile) throws IOException, MalformedURLException {
 		return addPath(new Path(localFile.toURI()), new Path(remoteFile));
 	}
@@ -214,10 +215,10 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 	 * @throws MalformedURLException if the remote path is invalid.
 	 */
 	public synchronized URL addPath(Path path, Path remoteFile) throws IOException, MalformedURLException {
-		if(paths.containsKey(remoteFile)) {
+		if (paths.containsKey(remoteFile)) {
 			throw new IllegalArgumentException("duplicate path registered");
 		}
-		if(remoteFile.isAbsolute()) {
+		if (remoteFile.isAbsolute()) {
 			throw new IllegalArgumentException("not expecting an absolute path");
 		}
 		URL fileURL = new URL(baseURL, remoteFile.toString());
@@ -229,7 +230,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 	}
 
 	public synchronized void removePath(Path remoteFile) {
-		if(paths.containsKey(remoteFile)) {
+		if (paths.containsKey(remoteFile)) {
 			URL fileURL = null;
 			try {
 				fileURL = new URL(baseURL, remoteFile.toString());
@@ -274,11 +275,11 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 
 		public VirtualFileServerHandler(Path path) throws IOException {
 			this.path = path;
-			if(!path.isAbsolute()) {
+			if (!path.isAbsolute()) {
 				throw new IllegalArgumentException("path must be absolute: " + path.toString());
 			}
 			this.fs = path.getFileSystem();
-			if(!fs.exists(path) || fs.getFileStatus(path).isDir()) {
+			if (!fs.exists(path) || fs.getFileStatus(path).isDir()) {
 				throw new IllegalArgumentException("no such file: " + path.toString());
 			}
 		}
@@ -292,12 +293,11 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 				LOG.debug("{} request for file '{}'", request.getMethod(), path);
 			}
 
-			if(!(request.getMethod() == GET || request.getMethod() == HEAD)) {
+			if (!(request.getMethod() == GET || request.getMethod() == HEAD)) {
 				sendMethodNotAllowed(ctx);
 				return;
 			}
 
-
 			final FileStatus status;
 			try {
 				status = fs.getFileStatus(path);
@@ -322,8 +322,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 				final FSDataInputStream stream = fs.open(path);
 				try {
 					ctx.write(new ChunkedStream(stream));
-				}
-				catch(Exception e) {
+				} catch (Exception e) {
 					stream.close();
 					throw e;
 				}
@@ -369,7 +368,6 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 		}
 	}
 
-
 	/**
 	 * Handle a request for a non-existent file.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java
index 40dc41c..7660e9c 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java
@@ -23,10 +23,11 @@ import org.apache.mesos.Protos;
 import org.apache.mesos.Scheduler;
 import org.apache.mesos.SchedulerDriver;
 import org.slf4j.Logger;
-import scala.Option;
 
 import java.util.Map;
 
+import scala.Option;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -53,7 +54,7 @@ public class MesosConfiguration {
 	/**
 	 * The Mesos connection string.
 	 *
-	 * The value should be in one of the following forms:
+	 * <p>The value should be in one of the following forms:
 	 * <pre>
 	 * {@code
 	 *     host:port
@@ -62,28 +63,28 @@ public class MesosConfiguration {
 	 *     file:///path/to/file (where file contains one of the above)
 	 * }
 	 * </pre>
-     */
+	 */
 	public String masterUrl() {
 		return masterUrl;
 	}
 
 	/**
 	 * The framework registration info.
-     */
+	 */
 	public Protos.FrameworkInfo.Builder frameworkInfo() {
 		return frameworkInfo;
 	}
 
 	/**
 	 * The credential to authenticate the framework principal.
-     */
+	 */
 	public Option<Protos.Credential.Builder> credential() {
 		return credential;
 	}
 
 	/**
 	 * Revise the configuration with updated framework info.
-     */
+	 */
 	public MesosConfiguration withFrameworkInfo(Protos.FrameworkInfo.Builder frameworkInfo) {
 		return new MesosConfiguration(masterUrl, frameworkInfo, credential);
 	}
@@ -92,11 +93,11 @@ public class MesosConfiguration {
 	 * Create the Mesos scheduler driver based on this configuration.
 	 * @param scheduler the scheduler to use.
 	 * @param implicitAcknowledgements whether to configure the driver for implicit acknowledgements.
-     * @return a scheduler driver.
-     */
+	 * @return a scheduler driver.
+	 */
 	public SchedulerDriver createDriver(Scheduler scheduler, boolean implicitAcknowledgements) {
 		MesosSchedulerDriver schedulerDriver;
-		if(this.credential().isDefined()) {
+		if (this.credential().isDefined()) {
 			schedulerDriver =
 				new MesosSchedulerDriver(scheduler, frameworkInfo.build(), this.masterUrl(),
 					implicitAcknowledgements, this.credential().get().build());
@@ -119,11 +120,11 @@ public class MesosConfiguration {
 	}
 
 	/**
-	 * A utility method to log relevant Mesos connection info
-     */
+	 * A utility method to log relevant Mesos connection info.
+	 */
 	public static void logMesosConfig(Logger log, MesosConfiguration config) {
 
-		Map<String,String> env = System.getenv();
+		Map<String, String> env = System.getenv();
 		Protos.FrameworkInfo.Builder info = config.frameworkInfo();
 
 		log.info("--------------------------------------------------------------------------------");
@@ -137,10 +138,10 @@ public class MesosConfiguration {
 		log.info("    Role: {}", info.hasRole() ? info.getRole() : "(none)");
 		log.info("    Principal: {}", info.hasPrincipal() ? info.getPrincipal() : "(none)");
 		log.info("    Host: {}", info.hasHostname() ? info.getHostname() : "(none)");
-		if(env.containsKey("LIBPROCESS_IP")) {
+		if (env.containsKey("LIBPROCESS_IP")) {
 			log.info("    LIBPROCESS_IP: {}", env.get("LIBPROCESS_IP"));
 		}
-		if(env.containsKey("LIBPROCESS_PORT")) {
+		if (env.containsKey("LIBPROCESS_PORT")) {
 			log.info("    LIBPROCESS_PORT: {}", env.get("LIBPROCESS_PORT"));
 		}
 		log.info("    Web UI: {}", info.hasWebuiUrl() ? info.getWebuiUrl() : "(none)");

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java
index 6892a65..4211642 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java
@@ -18,9 +18,13 @@
 
 package org.apache.flink.mesos.util;
 
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.configuration.Configuration;
 
+import org.apache.curator.framework.CuratorFramework;
+
+/**
+ * Utilities for Zookeeper.
+ */
 public class ZooKeeperUtils {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index 7ab4e40..af3f7ef 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -18,29 +18,33 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-import akka.testkit.TestProbe;
-import com.netflix.fenzo.ConstraintEvaluator;
-import junit.framework.AssertionFailedError;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
 import org.apache.flink.mesos.scheduler.LaunchCoordinator;
 import org.apache.flink.mesos.scheduler.TaskMonitor;
-import org.apache.flink.mesos.scheduler.messages.*;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
 import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
 import org.apache.flink.mesos.util.MesosArtifactResolver;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import org.apache.flink.runtime.clusterframework.messages.*;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
+import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
+import org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -50,27 +54,43 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
-import org.apache.mesos.SchedulerDriver;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import akka.testkit.TestProbe;
+import com.netflix.fenzo.ConstraintEvaluator;
+import junit.framework.AssertionFailedError;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 
-import static java.util.Collections.singletonList;
+import scala.Option;
 
+import static java.util.Collections.singletonList;
 import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractGoalState;
 import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractResourceID;
-import static org.hamcrest.Matchers.*;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * General tests for the Mesos resource manager component.
@@ -125,13 +145,24 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 		}
 
 		@Override
-		protected ActorRef createConnectionMonitor() { return connectionMonitor.ref(); }
+		protected ActorRef createConnectionMonitor() {
+			return connectionMonitor.ref();
+		}
+
 		@Override
-		protected ActorRef createTaskRouter() { return taskRouter.ref(); }
+		protected ActorRef createTaskRouter() {
+			return taskRouter.ref();
+		}
+
 		@Override
-		protected ActorRef createLaunchCoordinator() { return launchCoordinator.ref(); }
+		protected ActorRef createLaunchCoordinator() {
+			return launchCoordinator.ref();
+		}
+
 		@Override
-		protected ActorRef createReconciliationCoordinator() { return reconciliationCoordinator.ref(); }
+		protected ActorRef createReconciliationCoordinator() {
+			return reconciliationCoordinator.ref();
+		}
 
 		@Override
 		protected void fatalError(String message, Throwable error) {
@@ -214,11 +245,11 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 			ContaineredTaskManagerParameters containeredParams =
 				new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
 			MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
-				1.0, 
-				MesosTaskManagerParameters.ContainerType.MESOS, 
-				Option.<String>empty(), 
-				containeredParams, 
-				Collections.<Protos.Volume>emptyList(), 
+				1.0,
+				MesosTaskManagerParameters.ContainerType.MESOS,
+				Option.<String>empty(),
+				containeredParams,
+				Collections.<Protos.Volume>emptyList(),
 				Collections.<ConstraintEvaluator>emptyList(),
 				Option.<String>empty(),
 				Option.<String>empty());
@@ -301,8 +332,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						resourceManagerInstance.launchCoordinator.expectMsgClass(LaunchCoordinator.Launch.class);
 
 						register(Collections.<ResourceID>emptyList());
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -331,8 +361,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						assertThat(resourceManagerInstance.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched));
 						register(singletonList(extractResourceID(task1)));
 						assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -364,8 +393,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 
 						// verify that the internal state was updated
 						assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -409,8 +437,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						// verify that the instance state was updated
 						assertThat(resourceManagerInstance.workersBeingReturned.entrySet(), empty());
 						verify(workerStore).removeWorker(task1);
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -442,8 +469,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						assertThat(resourceManagerInstance.workersInNew, hasEntry(extractResourceID(task1), expected));
 						resourceManagerInstance.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class);
 						resourceManagerInstance.launchCoordinator.expectMsgClass(LaunchCoordinator.Launch.class);
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -508,8 +534,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						resourceManagerInstance.taskRouter.expectMsg(
 							new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker1launched)));
 						verify(schedulerDriver).acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -567,8 +592,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						// verify that the instance state was updated
 						assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
 						verify(workerStore).newTaskID();
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -604,8 +628,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
 						expectMsgClass(ResourceRemoved.class);
 						verify(workerStore).newTaskID();
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -632,8 +655,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						verify(schedulerDriver).stop(false);
 						verify(workerStore).stop(true);
 						expectTerminated(resourceManager.actor());
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -665,8 +687,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						resourceManagerInstance.reconciliationCoordinator.expectMsgClass(Registered.class);
 						resourceManagerInstance.launchCoordinator.expectMsgClass(Registered.class);
 						resourceManagerInstance.taskRouter.expectMsgClass(Registered.class);
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -697,8 +718,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						resourceManagerInstance.reconciliationCoordinator.expectMsgClass(ReRegistered.class);
 						resourceManagerInstance.launchCoordinator.expectMsgClass(ReRegistered.class);
 						resourceManagerInstance.taskRouter.expectMsgClass(ReRegistered.class);
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -726,8 +746,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						resourceManagerInstance.reconciliationCoordinator.expectMsgClass(Disconnected.class);
 						resourceManagerInstance.launchCoordinator.expectMsgClass(Disconnected.class);
 						resourceManagerInstance.taskRouter.expectMsgClass(Disconnected.class);
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -752,8 +771,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						watch(resourceManager.actor());
 						resourceManager.tell(new Error("test"), resourceManager);
 						expectTerminated(resourceManager.actor());
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}


[3/5] flink git commit: [FLINK-6642] Return -1 in EnvInfo#getOpenFileHandlesLimit

Posted by ch...@apache.org.
[FLINK-6642] Return -1 in EnvInfo#getOpenFileHandlesLimit

This closes #3956.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d4fba3b6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d4fba3b6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d4fba3b6

Branch: refs/heads/master
Commit: d4fba3b674fc67db198e4cf163de6ca44b5caf8e
Parents: 0bca76e
Author: zentol <ch...@apache.org>
Authored: Fri May 19 17:22:26 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 25 15:56:43 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/util/EnvironmentInformation.java    | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d4fba3b6/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
index 39fa80e..7d44f74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
@@ -25,6 +25,7 @@ import java.lang.reflect.Method;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.flink.util.OperatingSystem;
 import org.apache.hadoop.util.VersionInfo;
 
 import org.slf4j.Logger;
@@ -232,6 +233,9 @@ public class EnvironmentInformation {
 	 * @return The limit of open file handles, or {@code -1}, if the limit could not be determined.
 	 */
 	public static long getOpenFileHandlesLimit() {
+		if (OperatingSystem.isWindows()) { // getMaxFileDescriptorCount method is not available on Windows
+			return -1L;
+		}
 		Class<?> sunBeanClass;
 		try {
 			sunBeanClass = Class.forName("com.sun.management.UnixOperatingSystemMXBean");


[5/5] flink git commit: [FLINK-5892] Enable 1.2 keyed state test

Posted by ch...@apache.org.
[FLINK-5892] Enable 1.2 keyed state test

This closes #3842.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79b86949
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79b86949
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79b86949

Branch: refs/heads/master
Commit: 79b869498c82c83baf44465aff5acac3bd948bf5
Parents: 8dfc9f9
Author: zentol <ch...@apache.org>
Authored: Mon May 8 11:56:57 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 25 16:00:32 2017 +0200

----------------------------------------------------------------------
 .../state/operator/restore/keyed/KeyedJob.java  |  14 ++++++--------
 .../operatorstate/complexKeyed/_metadata        | Bin 137490 -> 134953 bytes
 2 files changed, 6 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/79b86949/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
index 6add7b2..3c28c3b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
@@ -97,10 +97,9 @@ public class KeyedJob {
 			.map(new StatefulStringStoringMap(mode, "first"))
 			.setParallelism(4);
 
-		// TODO: re-enable this when generating the actual 1.2 savepoint
-		//if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
-		map.uid("first");
-		//}
+		if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
+			map.uid("first");
+		}
 
 		return map;
 	}
@@ -110,10 +109,9 @@ public class KeyedJob {
 			.map(new StatefulStringStoringMap(mode, "second"))
 			.setParallelism(4);
 
-		// TODO: re-enable this when generating the actual 1.2 savepoint
-		//if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
-		map.uid("second");
-		//}
+		if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
+			map.uid("second");
+		}
 
 		return map;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/79b86949/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata b/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata
index 9e03876..0a1ed10 100644
Binary files a/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata and b/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata differ