You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/25 18:49:42 UTC
[1/5] flink git commit: [FLINK-6715] Activate strict checkstyle for
flink-mesos
Repository: flink
Updated Branches:
refs/heads/master 0e69dd5cc -> 79b869498
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
index bc303e9..1f33cb5 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,20 +18,26 @@
package org.apache.flink.mesos.runtime.clusterframework;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.TestLogger;
+
import com.netflix.fenzo.ConstraintEvaluator;
import com.netflix.fenzo.functions.Func1;
import com.netflix.fenzo.plugins.HostAttrValueConstraint;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.TestLogger;
import org.apache.mesos.Protos;
import org.junit.Test;
-import scala.Option;
import java.util.List;
+import scala.Option;
+
import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+/**
+ * Tests for the {@link MesosTaskManagerParameters}.
+ */
public class MesosTaskManagerParametersTest extends TestLogger {
@Test
@@ -56,12 +62,12 @@ public class MesosTaskManagerParametersTest extends TestLogger {
assertEquals(0, MesosTaskManagerParameters.buildVolumes(Option.<String>apply("")).size());
}
- @Test(expected=IllegalArgumentException.class)
+ @Test(expected = IllegalArgumentException.class)
public void testBuildVolumesBadMode() throws Exception {
MesosTaskManagerParameters.buildVolumes(Option.<String>apply("/hp:/cp:RF"));
}
- @Test(expected=IllegalArgumentException.class)
+ @Test(expected = IllegalArgumentException.class)
public void testBuildVolumesMalformed() throws Exception {
MesosTaskManagerParameters.buildVolumes(Option.<String>apply("/hp:/cp:ro:extra"));
}
@@ -77,66 +83,65 @@ public class MesosTaskManagerParametersTest extends TestLogger {
assertEquals("/host/path", params.containerVolumes().get(0).getHostPath());
assertEquals(Protos.Volume.Mode.RO, params.containerVolumes().get(0).getMode());
}
-
+
+ @Test
+ public void givenTwoConstraintsInConfigShouldBeParsed() throws Exception {
+
+ MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration("cluster:foo,az:eu-west-1"));
+ assertThat(mesosTaskManagerParameters.constraints().size(), is(2));
+ ConstraintEvaluator firstConstraintEvaluator = new HostAttrValueConstraint("cluster", new Func1<String, String>() {
+ @Override
+ public String call(String s) {
+ return "foo";
+ }
+ });
+ ConstraintEvaluator secondConstraintEvaluator = new HostAttrValueConstraint("az", new Func1<String, String>() {
+ @Override
+ public String call(String s) {
+ return "foo";
+ }
+ });
+ assertThat(mesosTaskManagerParameters.constraints().get(0).getName(), is(firstConstraintEvaluator.getName()));
+ assertThat(mesosTaskManagerParameters.constraints().get(1).getName(), is(secondConstraintEvaluator.getName()));
+
+ }
+
+ @Test
+ public void givenOneConstraintInConfigShouldBeParsed() throws Exception {
+
+ MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration("cluster:foo"));
+ assertThat(mesosTaskManagerParameters.constraints().size(), is(1));
+ ConstraintEvaluator firstConstraintEvaluator = new HostAttrValueConstraint("cluster", new Func1<String, String>() {
+ @Override
+ public String call(String s) {
+ return "foo";
+ }
+ });
+ assertThat(mesosTaskManagerParameters.constraints().get(0).getName(), is(firstConstraintEvaluator.getName()));
+ }
+
@Test
- public void givenTwoConstraintsInConfigShouldBeParsed() throws Exception {
-
- MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration("cluster:foo,az:eu-west-1"));
- assertThat(mesosTaskManagerParameters.constraints().size(), is(2));
- ConstraintEvaluator firstConstraintEvaluator = new HostAttrValueConstraint("cluster", new Func1<String, String>() {
- @Override
- public String call(String s) {
- return "foo";
- }
- });
- ConstraintEvaluator secondConstraintEvaluator = new HostAttrValueConstraint("az", new Func1<String, String>() {
- @Override
- public String call(String s) {
- return "foo";
- }
- });
- assertThat(mesosTaskManagerParameters.constraints().get(0).getName(), is(firstConstraintEvaluator.getName()));
- assertThat(mesosTaskManagerParameters.constraints().get(1).getName(), is(secondConstraintEvaluator.getName()));
-
- }
-
- @Test
- public void givenOneConstraintInConfigShouldBeParsed() throws Exception {
-
- MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration("cluster:foo"));
- assertThat(mesosTaskManagerParameters.constraints().size(), is(1));
- ConstraintEvaluator firstConstraintEvaluator = new HostAttrValueConstraint("cluster", new Func1<String, String>() {
- @Override
- public String call(String s) {
- return "foo";
- }
- });
- assertThat(mesosTaskManagerParameters.constraints().get(0).getName(), is(firstConstraintEvaluator.getName()));
- }
-
- @Test
- public void givenEmptyConstraintInConfigShouldBeParsed() throws Exception {
-
- MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration(""));
- assertThat(mesosTaskManagerParameters.constraints().size(), is(0));
- }
-
- @Test
- public void givenInvalidConstraintInConfigShouldBeParsed() throws Exception {
-
- MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration(",:,"));
- assertThat(mesosTaskManagerParameters.constraints().size(), is(0));
- }
-
-
- private static Configuration withHardHostAttrConstraintConfiguration(final String configuration) {
- return new Configuration() {
- private static final long serialVersionUID = -3249384117909445760L;
-
- {
- setString(MesosTaskManagerParameters.MESOS_CONSTRAINTS_HARD_HOSTATTR, configuration);
- }
- };
- }
+ public void givenEmptyConstraintInConfigShouldBeParsed() throws Exception {
+
+ MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration(""));
+ assertThat(mesosTaskManagerParameters.constraints().size(), is(0));
+ }
+
+ @Test
+ public void givenInvalidConstraintInConfigShouldBeParsed() throws Exception {
+
+ MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration(",:,"));
+ assertThat(mesosTaskManagerParameters.constraints().size(), is(0));
+ }
+
+ private static Configuration withHardHostAttrConstraintConfiguration(final String configuration) {
+ return new Configuration() {
+ private static final long serialVersionUID = -3249384117909445760L;
+
+ {
+ setString(MesosTaskManagerParameters.MESOS_CONSTRAINTS_HARD_HOSTATTR, configuration);
+ }
+ };
+ }
}
[4/5] flink git commit: [FLINK-4497] [cassandra] Scala Case Classes /
Tuple support
Posted by ch...@apache.org.
[FLINK-4497] [cassandra] Scala Case Classes / Tuple support
This closes #2633.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8dfc9f9d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8dfc9f9d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8dfc9f9d
Branch: refs/heads/master
Commit: 8dfc9f9d0f06e4ea2376b0a58efd623e58735ae5
Parents: d4fba3b
Author: zentol <ch...@apache.org>
Authored: Mon Oct 10 15:30:48 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 25 15:58:51 2017 +0200
----------------------------------------------------------------------
.../flink-connector-cassandra/pom.xml | 11 +++
.../cassandra/AbstractCassandraTupleSink.java | 52 +++++++++++
.../cassandra/CassandraScalaProductSink.java | 41 +++++++++
.../connectors/cassandra/CassandraSink.java | 91 ++++++++++++++++----
.../connectors/cassandra/CassandraSinkBase.java | 15 ++--
.../cassandra/CassandraTupleSink.java | 27 +-----
.../cassandra/CassandraConnectorITCase.java | 55 ++++++++++++
7 files changed, 245 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8dfc9f9d/flink-connectors/flink-connector-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/pom.xml b/flink-connectors/flink-connector-cassandra/pom.xml
index 2722c30..4a720c4 100644
--- a/flink-connectors/flink-connector-cassandra/pom.xml
+++ b/flink-connectors/flink-connector-cassandra/pom.xml
@@ -99,6 +99,12 @@ under the License.
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-scala_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>${driver.version}</version>
@@ -129,6 +135,11 @@ under the License.
</exclusions>
</dependency>
<dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
http://git-wip-us.apache.org/repos/asf/flink/blob/8dfc9f9d/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
new file mode 100644
index 0000000..7a8d097
--- /dev/null
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Abstract sink to write tuple-like values into a Cassandra cluster.
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public abstract class AbstractCassandraTupleSink<IN> extends CassandraSinkBase<IN, ResultSet> {
+ private final String insertQuery;
+ private transient PreparedStatement ps;
+
+ public AbstractCassandraTupleSink(String insertQuery, ClusterBuilder builder) {
+ super(builder);
+ this.insertQuery = insertQuery;
+ }
+
+ @Override
+ public void open(Configuration configuration) {
+ super.open(configuration);
+ this.ps = session.prepare(insertQuery);
+ }
+
+ @Override
+ public ListenableFuture<ResultSet> send(IN value) {
+ Object[] fields = extract(value);
+ return session.executeAsync(ps.bind(fields));
+ }
+
+ protected abstract Object[] extract(IN record);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8dfc9f9d/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
new file mode 100644
index 0000000..a975985
--- /dev/null
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+
+import scala.Product;
+
+/**
+ * Sink to write scala tuples and case classes into a Cassandra cluster.
+ *
+ * @param <IN> Type of the elements emitted by this sink, it must extend {@link Product}
+ */
+public class CassandraScalaProductSink<IN extends Product> extends AbstractCassandraTupleSink<IN> {
+ public CassandraScalaProductSink(String insertQuery, ClusterBuilder builder) {
+ super(insertQuery, builder);
+ }
+
+ @Override
+ protected Object[] extract(IN record) {
+ Object[] al = new Object[record.productArity()];
+ for (int i = 0; i < record.productArity(); i++) {
+ al[i] = record.productElement(i);
+ }
+ return al;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8dfc9f9d/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index 9f0079f..6a33601 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -22,7 +22,9 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -30,6 +32,7 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import scala.Product;
/**
* This class wraps different Cassandra sink implementations to provide a common interface for all of them.
@@ -190,13 +193,31 @@ public class CassandraSink<IN> {
* @param <IN> input type
* @return CassandraSinkBuilder, to further configure the sink
*/
+ public static <IN> CassandraSinkBuilder<IN> addSink(org.apache.flink.streaming.api.scala.DataStream<IN> input) {
+ return addSink(input.javaStream());
+ }
+
+ /**
+ * Writes a DataStream into a Cassandra database.
+ *
+ * @param input input DataStream
+ * @param <IN> input type
+ * @return CassandraSinkBuilder, to further configure the sink
+ */
public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) {
- if (input.getType() instanceof TupleTypeInfo) {
+ TypeInformation<IN> typeInfo = input.getType();
+ if (typeInfo instanceof TupleTypeInfo) {
DataStream<T> tupleInput = (DataStream<T>) input;
return (CassandraSinkBuilder<IN>) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig()));
- } else {
+ }
+ if (typeInfo instanceof PojoTypeInfo) {
return new CassandraPojoSinkBuilder<>(input, input.getType(), input.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
}
+ if (typeInfo instanceof CaseClassTypeInfo) {
+ DataStream<Product> productInput = (DataStream<Product>) input;
+ return (CassandraSinkBuilder<IN>) new CassandraScalaProductSinkBuilder<>(productInput, productInput.getType(), productInput.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
+ }
+ throw new IllegalArgumentException("No support for the type of the given DataStream: " + input.getType());
}
public abstract static class CassandraSinkBuilder<IN> {
@@ -300,7 +321,16 @@ public class CassandraSink<IN> {
* @return finalized sink
* @throws Exception
*/
- public abstract CassandraSink<IN> build() throws Exception;
+ public CassandraSink<IN> build() throws Exception {
+ sanityCheck();
+ return isWriteAheadLogEnabled
+ ? createWriteAheadSink()
+ : createSink();
+ }
+
+ protected abstract CassandraSink<IN> createSink() throws Exception;
+
+ protected abstract CassandraSink<IN> createWriteAheadSink() throws Exception;
protected void sanityCheck() {
if (builder == null) {
@@ -323,15 +353,15 @@ public class CassandraSink<IN> {
}
@Override
- public CassandraSink<IN> build() throws Exception {
- sanityCheck();
- if (isWriteAheadLogEnabled) {
- return committer == null
- ? new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, new CassandraCommitter(builder))))
- : new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, committer)));
- } else {
- return new CassandraSink<>(input.addSink(new CassandraTupleSink<IN>(query, builder)).name("Cassandra Sink"));
- }
+ public CassandraSink<IN> createSink() throws Exception {
+ return new CassandraSink<>(input.addSink(new CassandraTupleSink<IN>(query, builder)).name("Cassandra Sink"));
+ }
+
+ @Override
+ protected CassandraSink<IN> createWriteAheadSink() throws Exception {
+ return committer == null
+ ? new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, new CassandraCommitter(builder))))
+ : new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, committer)));
}
}
@@ -349,13 +379,38 @@ public class CassandraSink<IN> {
}
@Override
- public CassandraSink<IN> build() throws Exception {
- sanityCheck();
- if (isWriteAheadLogEnabled) {
- throw new IllegalArgumentException("Exactly-once guarantees can only be provided for tuple types.");
- } else {
- return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder)).name("Cassandra Sink"));
+ public CassandraSink<IN> createSink() throws Exception {
+ return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder)).name("Cassandra Sink"));
+ }
+
+ @Override
+ protected CassandraSink<IN> createWriteAheadSink() throws Exception {
+ throw new IllegalArgumentException("Exactly-once guarantees can only be provided for tuple types.");
+ }
+ }
+
+ public static class CassandraScalaProductSinkBuilder<IN extends Product> extends CassandraSinkBuilder<IN> {
+
+ public CassandraScalaProductSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
+ super(input, typeInfo, serializer);
+ }
+
+ @Override
+ protected void sanityCheck() {
+ super.sanityCheck();
+ if (query == null || query.length() == 0) {
+ throw new IllegalArgumentException("Query must not be null or empty.");
}
}
+
+ @Override
+ public CassandraSink<IN> createSink() throws Exception {
+ return new CassandraSink<>(input.addSink(new CassandraScalaProductSink<IN>(query, builder)).name("Cassandra Sink"));
+ }
+
+ @Override
+ protected CassandraSink<IN> createWriteAheadSink() throws Exception {
+ throw new IllegalArgumentException("Exactly-once guarantees can only be provided for flink tuple types.");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8dfc9f9d/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index b281525..b1b261e 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* @param <IN> Type of the elements emitted by this sink
*/
public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
- protected static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBase.class);
+ protected final Logger log = LoggerFactory.getLogger(getClass());
protected transient Cluster cluster;
protected transient Session session;
@@ -48,7 +48,7 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
private final AtomicInteger updatesPending = new AtomicInteger();
- protected CassandraSinkBase(ClusterBuilder builder) {
+ CassandraSinkBase(ClusterBuilder builder) {
this.builder = builder;
ClosureCleaner.clean(builder, true);
}
@@ -76,7 +76,7 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
}
exception = t;
- LOG.error("Error while sending value.", t);
+ log.error("Error while sending value.", t);
}
};
this.cluster = builder.getCluster();
@@ -107,21 +107,24 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
updatesPending.wait();
}
}
-
+
+ if (exception != null) {
+ throw new IOException("Error while sending value.", exception);
+ }
} finally {
try {
if (session != null) {
session.close();
}
} catch (Exception e) {
- LOG.error("Error while closing session.", e);
+ log.error("Error while closing session.", e);
}
try {
if (cluster != null) {
cluster.close();
}
} catch (Exception e) {
- LOG.error("Error while closing cluster.", e);
+ log.error("Error while closing cluster.", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8dfc9f9d/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
index 0a9ef06..a7ec1df 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
@@ -17,39 +17,20 @@
package org.apache.flink.streaming.connectors.cassandra;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.google.common.util.concurrent.ListenableFuture;
import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
/**
- * Flink Sink to save data into a Cassandra cluster.
+ * Sink to write Flink {@link Tuple}s into a Cassandra cluster.
*
* @param <IN> Type of the elements emitted by this sink, it must extend {@link Tuple}
*/
-public class CassandraTupleSink<IN extends Tuple> extends CassandraSinkBase<IN, ResultSet> {
- private final String insertQuery;
- private transient PreparedStatement ps;
-
+public class CassandraTupleSink<IN extends Tuple> extends AbstractCassandraTupleSink<IN> {
public CassandraTupleSink(String insertQuery, ClusterBuilder builder) {
- super(builder);
- this.insertQuery = insertQuery;
- }
-
- @Override
- public void open(Configuration configuration) {
- super.open(configuration);
- this.ps = session.prepare(insertQuery);
+ super(insertQuery, builder);
}
@Override
- public ListenableFuture<ResultSet> send(IN value) {
- Object[] fields = extract(value);
- return session.executeAsync(ps.bind(fields));
- }
-
- private Object[] extract(IN record) {
+ protected Object[] extract(IN record) {
Object[] al = new Object[record.getArity()];
for (int i = 0; i < record.getArity(); i++) {
al[i] = record.getField(i);
http://git-wip-us.apache.org/repos/asf/flink/blob/8dfc9f9d/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index 06f3c35..95cd86c 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -31,14 +31,20 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
import org.junit.AfterClass;
@@ -49,6 +55,8 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
import java.io.BufferedWriter;
import java.io.File;
@@ -420,4 +428,51 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
private String injectTableName(String target) {
return target.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID);
}
+
+ @Test
+ public void testCassandraScalaTupleAtLeastOnceSinkBuilderDetection() throws Exception {
+ Class<scala.Tuple1<String>> c = (Class<scala.Tuple1<String>>) new scala.Tuple1<>("hello").getClass();
+ Seq<TypeInformation<?>> typeInfos = JavaConverters.asScalaBufferConverter(
+ Collections.<TypeInformation<?>>singletonList(BasicTypeInfo.STRING_TYPE_INFO)).asScala();
+ Seq<String> fieldNames = JavaConverters.asScalaBufferConverter(
+ Collections.singletonList("_1")).asScala();
+
+ CaseClassTypeInfo<scala.Tuple1<String>> typeInfo = new CaseClassTypeInfo<scala.Tuple1<String>>(c, null, typeInfos, fieldNames) {
+ @Override
+ public TypeSerializer<scala.Tuple1<String>> createSerializer(ExecutionConfig config) {
+ return null;
+ }
+ };
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<scala.Tuple1<String>> input = env.fromElements(new scala.Tuple1<>("hello"));
+
+ CassandraSink.CassandraSinkBuilder<scala.Tuple1<String>> sinkBuilder = CassandraSink.addSink(input);
+ assertTrue(sinkBuilder instanceof CassandraSink.CassandraScalaProductSinkBuilder);
+ }
+
+ @Test
+ public void testCassandraScalaTupleAtLeastSink() throws Exception {
+ CassandraScalaProductSink<scala.Tuple3<String, Integer, Integer>> sink = new CassandraScalaProductSink<>(INSERT_DATA_QUERY, builder);
+
+ List<scala.Tuple3<String, Integer, Integer>> scalaTupleCollection = new ArrayList<>(20);
+ for (int i = 0; i < 20; i++) {
+ scalaTupleCollection.add(new scala.Tuple3<>(UUID.randomUUID().toString(), i, 0));
+ }
+
+ sink.open(new Configuration());
+ for (scala.Tuple3<String, Integer, Integer> value : scalaTupleCollection) {
+ sink.invoke(value);
+ }
+ sink.close();
+
+ ResultSet rs = session.execute(SELECT_DATA_QUERY);
+ List<Row> rows = rs.all();
+ assertEquals(scalaTupleCollection.size(), rows.size());
+
+ for (Row row : rows) {
+ scalaTupleCollection.remove(new scala.Tuple3<>(row.getString("id"), row.getInt("counter"), row.getInt("batch_id")));
+ }
+ assertEquals(0, scalaTupleCollection.size());
+ }
}
[2/5] flink git commit: [FLINK-6715] Activate strict checkstyle for
flink-mesos
Posted by ch...@apache.org.
[FLINK-6715] Activate strict checkstyle for flink-mesos
This closes #3988.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bca76ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bca76ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bca76ed
Branch: refs/heads/master
Commit: 0bca76ede8b7447014a7d7ed17633d77ecfafe18
Parents: 0e69dd5
Author: zentol <ch...@apache.org>
Authored: Thu May 25 10:25:24 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 25 15:56:26 2017 +0200
----------------------------------------------------------------------
flink-mesos/pom.xml | 35 +++++
.../main/java/org/apache/flink/mesos/Utils.java | 9 +-
.../flink/mesos/cli/FlinkMesosSessionCli.java | 15 +-
.../clusterframework/LaunchableMesosWorker.java | 35 ++---
.../MesosApplicationMasterRunner.java | 65 +++++----
.../clusterframework/MesosConfigKeys.java | 17 +--
.../MesosFlinkResourceManager.java | 101 +++++++-------
.../MesosTaskManagerParameters.java | 71 +++++-----
.../MesosTaskManagerRunner.java | 19 ++-
.../services/MesosServicesUtils.java | 3 +
.../services/StandaloneMesosServices.java | 3 +-
.../store/MesosWorkerStore.java | 13 +-
.../store/StandaloneMesosWorkerStore.java | 3 +-
.../store/ZooKeeperMesosWorkerStore.java | 16 ++-
.../flink/mesos/scheduler/SchedulerProxy.java | 10 +-
.../mesos/scheduler/TaskSchedulerBuilder.java | 2 +-
.../scheduler/messages/ResourceOffers.java | 1 +
.../flink/mesos/util/MesosArtifactResolver.java | 3 +-
.../flink/mesos/util/MesosArtifactServer.java | 50 ++++---
.../flink/mesos/util/MesosConfiguration.java | 29 ++--
.../apache/flink/mesos/util/ZooKeeperUtils.java | 6 +-
.../MesosFlinkResourceManagerTest.java | 122 +++++++++-------
.../MesosTaskManagerParametersTest.java | 139 ++++++++++---------
23 files changed, 427 insertions(+), 340 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/pom.xml
----------------------------------------------------------------------
diff --git a/flink-mesos/pom.xml b/flink-mesos/pom.xml
index 4dae731..94187ee 100644
--- a/flink-mesos/pom.xml
+++ b/flink-mesos/pom.xml
@@ -300,6 +300,41 @@ under the License.
</execution>
</executions>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <dependencies>
+ <dependency>
+ <groupId>com.puppycrawl.tools</groupId>
+ <artifactId>checkstyle</artifactId>
+ <version>6.19</version>
+ </dependency>
+ </dependencies>
+ <configuration>
+ <configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+ <suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <logViolationsToConsole>true</logViolationsToConsole>
+ <failOnViolation>true</failOnViolation>
+ </configuration>
+ <executions>
+ <!--
+ Execute checkstyle after compilation but before tests.
+
+ This ensures that any parsing or type checking errors are from
+ javac, so they look as expected. Beyond that, we want to
+ fail as early as possible.
+ -->
+ <execution>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
index 7787e40..308e093 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
@@ -20,12 +20,17 @@ package org.apache.flink.mesos;
import org.apache.flink.mesos.util.MesosArtifactResolver;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+
import org.apache.mesos.Protos;
-import scala.Option;
import java.net.URL;
import java.util.Arrays;
+import scala.Option;
+
+/**
+ * Collection of utility methods.
+ */
public class Utils {
/**
* Construct a Mesos environment variable.
@@ -53,7 +58,7 @@ public class Utils {
*/
public static Protos.CommandInfo.URI uri(MesosArtifactResolver resolver, ContainerSpecification.Artifact artifact) {
Option<URL> url = resolver.resolve(artifact.dest);
- if(url.isEmpty()) {
+ if (url.isEmpty()) {
throw new IllegalArgumentException("Unresolvable artifact: " + artifact.dest);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java b/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
index dcce0b8..f6850d2 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
@@ -18,10 +18,11 @@
package org.apache.flink.mesos.cli;
+import org.apache.flink.configuration.Configuration;
+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.configuration.Configuration;
import java.io.IOException;
import java.util.Map;
@@ -35,28 +36,30 @@ public class FlinkMesosSessionCli {
/**
* Decode encoded dynamic properties.
+ *
* @param dynamicPropertiesEncoded encoded properties produced by the encoding method.
* @return a configuration instance to be merged with the static configuration.
*/
public static Configuration decodeDynamicProperties(String dynamicPropertiesEncoded) {
try {
Configuration configuration = new Configuration();
- if(dynamicPropertiesEncoded != null) {
- TypeReference<Map<String, String>> typeRef = new TypeReference<Map<String, String>>() {};
- Map<String,String> props = mapper.readValue(dynamicPropertiesEncoded, typeRef);
+ if (dynamicPropertiesEncoded != null) {
+ TypeReference<Map<String, String>> typeRef = new TypeReference<Map<String, String>>() {
+ };
+ Map<String, String> props = mapper.readValue(dynamicPropertiesEncoded, typeRef);
for (Map.Entry<String, String> property : props.entrySet()) {
configuration.setString(property.getKey(), property.getValue());
}
}
return configuration;
- }
- catch(IOException ex) {
+ } catch (IOException ex) {
throw new IllegalArgumentException("unreadable encoded properties", ex);
}
}
/**
* Encode dynamic properties as a string to be transported as an environment variable.
+ *
* @param configuration the dynamic properties to encode.
* @return a string to be decoded later.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 04a406f..ce7bb9d 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -18,23 +18,23 @@
package org.apache.flink.mesos.runtime.clusterframework;
-import com.netflix.fenzo.ConstraintEvaluator;
-import com.netflix.fenzo.TaskAssignmentResult;
-import com.netflix.fenzo.TaskRequest;
-import com.netflix.fenzo.VMTaskFitnessCalculator;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.Utils;
import org.apache.flink.mesos.scheduler.LaunchableTask;
import org.apache.flink.mesos.util.MesosArtifactResolver;
import org.apache.flink.mesos.util.MesosConfiguration;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.util.Preconditions;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.VMTaskFitnessCalculator;
import org.apache.mesos.Protos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
import java.util.Collections;
import java.util.List;
@@ -42,15 +42,17 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
-import static org.apache.flink.mesos.Utils.variable;
+import scala.Option;
+
import static org.apache.flink.mesos.Utils.range;
import static org.apache.flink.mesos.Utils.ranges;
import static org.apache.flink.mesos.Utils.scalar;
+import static org.apache.flink.mesos.Utils.variable;
/**
* Implements the launch of a Mesos worker.
*
- * Translates the abstract {@link ContainerSpecification} into a concrete
+ * <p>Translates the abstract {@link ContainerSpecification} into a concrete
* Mesos-specific {@link Protos.TaskInfo}.
*/
public class LaunchableMesosWorker implements LaunchableTask {
@@ -59,9 +61,9 @@ public class LaunchableMesosWorker implements LaunchableTask {
/**
* The set of configuration keys to be dynamically configured with a port allocated from Mesos.
*/
- private static String[] TM_PORT_KEYS = {
+ private static final String[] TM_PORT_KEYS = {
"taskmanager.rpc.port",
- "taskmanager.data.port" };
+ "taskmanager.data.port"};
private final MesosArtifactResolver resolver;
private final ContainerSpecification containerSpec;
@@ -88,7 +90,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
this.params = Preconditions.checkNotNull(params);
this.taskID = Preconditions.checkNotNull(taskID);
this.mesosConfiguration = Preconditions.checkNotNull(mesosConfiguration);
-
+
this.taskRequest = new Request();
}
@@ -204,7 +206,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
//configure task manager hostname property if hostname override property is supplied
Option<String> taskManagerHostnameOption = params.getTaskManagerHostname();
- if(taskManagerHostnameOption.isDefined()) {
+ if (taskManagerHostnameOption.isDefined()) {
// replace the TASK_ID pattern by the actual task id value of the Mesos task
final String taskManagerHostname = MesosTaskManagerParameters.TASK_ID_PATTERN
.matcher(taskManagerHostnameOption.get())
@@ -225,7 +227,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
}
// ship additional files
- for(ContainerSpecification.Artifact artifact : containerSpec.getArtifacts()) {
+ for (ContainerSpecification.Artifact artifact : containerSpec.getArtifacts()) {
cmd.addUris(Utils.uri(resolver, artifact));
}
@@ -271,9 +273,9 @@ public class LaunchableMesosWorker implements LaunchableTask {
// in event that no docker image or mesos image name is specified, we must still
// set type to MESOS
containerInfo.setType(Protos.ContainerInfo.Type.MESOS);
- switch(params.containerType()) {
+ switch (params.containerType()) {
case MESOS:
- if(params.containerImageName().isDefined()) {
+ if (params.containerImageName().isDefined()) {
containerInfo
.setMesos(Protos.ContainerInfo.MesosInfo.newBuilder()
.setImage(Protos.Image.newBuilder()
@@ -285,7 +287,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
case DOCKER:
assert(params.containerImageName().isDefined());
- containerInfo
+ containerInfo
.setType(Protos.ContainerInfo.Type.DOCKER)
.setDocker(Protos.ContainerInfo.DockerInfo.newBuilder()
.setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST)
@@ -300,7 +302,6 @@ public class LaunchableMesosWorker implements LaunchableTask {
containerInfo.addAllVolumes(params.containerVolumes());
taskInfo.setContainer(containerInfo);
-
return taskInfo.build();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 09ef380..fc75bd7 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -18,16 +18,6 @@
package org.apache.flink.mesos.runtime.clusterframework;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Address;
-import akka.actor.Props;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -62,15 +52,19 @@ import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
-import org.apache.mesos.Protos;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.Props;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.mesos.Protos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
@@ -82,6 +76,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
import static org.apache.flink.util.Preconditions.checkState;
/**
@@ -89,24 +87,26 @@ import static org.apache.flink.util.Preconditions.checkState;
* It starts actor system and the actors for {@link JobManager}
* and {@link MesosFlinkResourceManager}.
*
- * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container
+ * <p>The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container
* allocation and failure detection.
*/
public class MesosApplicationMasterRunner {
- /** Logger */
+
protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
- /** The maximum time that TaskManagers may be waiting to register at the JobManager,
- * before they quit */
+ /**
+ * The maximum time that TaskManagers may be waiting to register at the JobManager,
+ * before they quit.
+ */
private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
- /** The process environment variables */
+ /** The process environment variables. */
private static final Map<String, String> ENV = System.getenv();
- /** The exit code returned if the initialization of the application master failed */
+ /** The exit code returned if the initialization of the application master failed. */
private static final int INIT_ERROR_EXIT_CODE = 31;
- /** The exit code returned if the process exits because a critical actor died */
+ /** The exit code returned if the process exits because a critical actor died. */
private static final int ACTOR_DIED_EXIT_CODE = 32;
// ------------------------------------------------------------------------
@@ -142,7 +142,7 @@ public class MesosApplicationMasterRunner {
/**
* The instance entry point for the Mesos AppMaster. Obtains user group
- * information and calls the main work method {@link #runPrivileged(Configuration,Configuration)} as a
+ * information and calls the main work method {@link #runPrivileged(Configuration, Configuration)} as a
* privileged action.
*
* @param args The command line arguments.
@@ -318,7 +318,6 @@ public class MesosApplicationMasterRunner {
getJobManagerClass(),
getArchivistClass())._1();
-
// 2: the web monitor
LOG.debug("Starting Web Frontend");
@@ -328,7 +327,7 @@ public class MesosApplicationMasterRunner {
actorSystem,
jobManager,
LOG);
- if(webMonitor != null) {
+ if (webMonitor != null) {
final URL webMonitorURL = new URL("http", appMasterHostname, webMonitor.getServerPort(), "/");
mesosConfig.frameworkInfo().setWebuiUrl(webMonitorURL.toExternalForm());
}
@@ -380,7 +379,7 @@ public class MesosApplicationMasterRunner {
}
}
- if(artifactServer != null) {
+ if (artifactServer != null) {
try {
artifactServer.stop();
} catch (Throwable ignored) {
@@ -396,7 +395,7 @@ public class MesosApplicationMasterRunner {
}
}
- if(futureExecutor != null) {
+ if (futureExecutor != null) {
try {
futureExecutor.shutdownNow();
} catch (Throwable tt) {
@@ -404,7 +403,7 @@ public class MesosApplicationMasterRunner {
}
}
- if(ioExecutor != null) {
+ if (ioExecutor != null) {
try {
ioExecutor.shutdownNow();
} catch (Throwable tt) {
@@ -493,7 +492,7 @@ public class MesosApplicationMasterRunner {
.setHostname(hostname);
Protos.Credential.Builder credential = null;
- if(!flinkConfig.containsKey(ConfigConstants.MESOS_MASTER_URL)) {
+ if (!flinkConfig.containsKey(ConfigConstants.MESOS_MASTER_URL)) {
throw new IllegalConfigurationException(ConfigConstants.MESOS_MASTER_URL + " must be configured.");
}
String masterUrl = flinkConfig.getString(ConfigConstants.MESOS_MASTER_URL, null);
@@ -517,7 +516,7 @@ public class MesosApplicationMasterRunner {
ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_USER,
ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER));
- if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
+ if (flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
frameworkInfo.setPrincipal(flinkConfig.getString(
ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL, null));
@@ -526,7 +525,7 @@ public class MesosApplicationMasterRunner {
// some environments use a side-channel to communicate the secret to Mesos,
// and thus don't set the 'secret' configuration setting
- if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) {
+ if (flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) {
credential.setSecret(flinkConfig.getString(
ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null));
}
@@ -545,7 +544,7 @@ public class MesosApplicationMasterRunner {
* needs (such as JAR file, config file, ...) and all environment variables into a container specification.
* The Mesos fetcher then ensures that those artifacts will be copied into the task's sandbox directory.
* A lightweight HTTP server serves the artifacts to the fetcher.
- */
+ */
private static void applyOverlays(
Configuration globalConfiguration, ContainerSpecification containerSpec) throws IOException {
@@ -565,7 +564,7 @@ public class MesosApplicationMasterRunner {
private static void configureArtifactServer(MesosArtifactServer server, ContainerSpecification container) throws IOException {
// serve the artifacts associated with the container environment
- for(ContainerSpecification.Artifact artifact : container.getArtifacts()) {
+ for (ContainerSpecification.Artifact artifact : container.getArtifacts()) {
server.addPath(artifact.source, artifact.dest);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
index 35da95f..1af94e2 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
@@ -27,30 +27,31 @@ public class MesosConfigKeys {
// ------------------------------------------------------------------------
/**
- * The Mesos task ID, used by the TM for informational purposes
+ * The Mesos task ID, used by the TM for informational purposes.
*/
public static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
/**
- * Reserved for future enhancement
+ * Reserved for future enhancement.
*/
public static final String ENV_FLINK_TMP_DIR = "_FLINK_TMP_DIR";
/**
- * JVM arguments, used by the JM and TM
+ * JVM arguments, used by the JM and TM.
*/
public static final String ENV_JVM_ARGS = "JVM_ARGS";
/**
- * Standard environment variables used in DCOS environment
+ * Standard environment variables used in DCOS environment.
*/
public static final String ENV_TASK_NAME = "TASK_NAME";
/**
- * Standard environment variables used in DCOS environment
- */
+ * Standard environment variables used in DCOS environment.
+ */
public static final String ENV_FRAMEWORK_NAME = "FRAMEWORK_NAME";
- /** Private constructor to prevent instantiation */
- private MesosConfigKeys() {}
+ /** Private constructor to prevent instantiation. */
+ private MesosConfigKeys() {
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
index 17ffef7..6c708fa 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
@@ -18,20 +18,14 @@
package org.apache.flink.mesos.runtime.clusterframework;
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import com.netflix.fenzo.TaskRequest;
-import com.netflix.fenzo.TaskScheduler;
-import com.netflix.fenzo.VirtualMachineLease;
-import com.netflix.fenzo.functions.Action1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.scheduler.ConnectionMonitor;
-import org.apache.flink.mesos.scheduler.LaunchableTask;
import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
import org.apache.flink.mesos.scheduler.SchedulerProxy;
import org.apache.flink.mesos.scheduler.TaskMonitor;
@@ -48,17 +42,23 @@ import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
import org.apache.flink.mesos.util.MesosArtifactResolver;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
import org.apache.flink.runtime.clusterframework.messages.StopCluster;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.FrameworkInfo;
import org.apache.mesos.SchedulerDriver;
import org.slf4j.Logger;
-import scala.Option;
import java.util.ArrayList;
import java.util.Collection;
@@ -66,6 +66,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import scala.Option;
+
import static java.util.Objects.requireNonNull;
/**
@@ -73,25 +75,25 @@ import static java.util.Objects.requireNonNull;
*/
public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMesosWorkerNode> {
- /** The Mesos configuration (master and framework info) */
+ /** The Mesos configuration (master and framework info). */
private final MesosConfiguration mesosConfig;
- /** The TaskManager container parameters (like container memory size) */
+ /** The TaskManager container parameters (like container memory size). */
private final MesosTaskManagerParameters taskManagerParameters;
- /** Container specification for launching a TM */
+ /** Container specification for launching a TM. */
private final ContainerSpecification taskManagerContainerSpec;
- /** Resolver for HTTP artifacts **/
+ /** Resolver for HTTP artifacts. **/
private final MesosArtifactResolver artifactResolver;
/** Number of failed Mesos tasks before stopping the application. -1 means infinite. */
private final int maxFailedTasks;
- /** Callback handler for the asynchronous Mesos scheduler */
+ /** Callback handler for the asynchronous Mesos scheduler. */
private SchedulerProxy schedulerCallbackHandler;
- /** Mesos scheduler driver */
+ /** Mesos scheduler driver. */
private SchedulerDriver schedulerDriver;
private ActorRef connectionMonitor;
@@ -104,12 +106,12 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
private final MesosWorkerStore workerStore;
- /** planning state related to workers - package private for unit test purposes */
+ /** planning state related to workers - package private for unit test purposes. */
final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
- /** The number of failed tasks since the master became active */
+ /** The number of failed tasks since the master became active. */
private int failedTasksSoFar;
public MesosFlinkResourceManager(
@@ -158,7 +160,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
.setCheckpoint(true);
Option<Protos.FrameworkID> frameworkID = workerStore.getFrameworkID();
- if(frameworkID.isEmpty()) {
+ if (frameworkID.isEmpty()) {
LOG.info("Registering as new framework.");
}
else {
@@ -248,7 +250,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
TaskMonitor.TaskTerminated msg = (TaskMonitor.TaskTerminated) message;
taskTerminated(msg.taskID(), msg.status());
- } else {
+ } else {
// message handled by the generic resource master code
super.handleMessage(message);
}
@@ -267,15 +269,13 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
try {
// unregister the framework, which implicitly removes all tasks.
schedulerDriver.stop(false);
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
LOG.warn("unable to unregister the framework", ex);
}
try {
workerStore.stop(true);
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
LOG.warn("unable to stop the worker state store", ex);
}
@@ -308,13 +308,13 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
if (!tasksFromPreviousAttempts.isEmpty()) {
LOG.info("Retrieved {} TaskManagers from previous attempt", tasksFromPreviousAttempts.size());
- List<Tuple2<TaskRequest,String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size());
+ List<Tuple2<TaskRequest, String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size());
List<LaunchableTask> toLaunch = new ArrayList<>(tasksFromPreviousAttempts.size());
for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID());
- switch(worker.state()) {
+ switch (worker.state()) {
case New:
workersInNew.put(extractResourceID(worker.taskID()), worker);
toLaunch.add(launchable);
@@ -331,11 +331,11 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
}
// tell the launch coordinator about prior assignments
- if(toAssign.size() >= 1) {
+ if (toAssign.size() >= 1) {
launchCoordinator.tell(new LaunchCoordinator.Assign(toAssign), self());
}
// tell the launch coordinator to launch any new tasks
- if(toLaunch.size() >= 1) {
+ if (toLaunch.size() >= 1) {
launchCoordinator.tell(new LaunchCoordinator.Launch(toLaunch), self());
}
}
@@ -374,11 +374,10 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
}
// tell the launch coordinator to launch the new tasks
- if(toLaunch.size() >= 1) {
+ if (toLaunch.size() >= 1) {
launchCoordinator.tell(new LaunchCoordinator.Launch(toLaunch), self());
}
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
fatalError("unable to request new workers", ex);
}
}
@@ -386,7 +385,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
/**
* Accept offers as advised by the launch coordinator.
*
- * Acceptance is routed through the RM to update the persistent state before
+ * <p>Acceptance is routed through the RM to update the persistent state before
* forwarding the message to Mesos.
*/
private void acceptOffers(AcceptOffers msg) {
@@ -421,15 +420,14 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
// send the acceptance message to Mesos
schedulerDriver.acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
fatalError("unable to accept offers", ex);
}
}
/**
* Handle a task status change.
- */
+ */
private void taskStatusUpdated(StatusUpdate message) {
taskRouter.tell(message, self());
reconciliationCoordinator.tell(message, self());
@@ -470,9 +468,8 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
if (worker != null) {
LOG.info("Mesos worker consolidation recognizes TaskManager {}.", resourceID);
accepted.add(new RegisteredMesosWorkerNode(worker));
- }
- else {
- if(isStarted(resourceID)) {
+ } else {
+ if (isStarted(resourceID)) {
LOG.info("TaskManager {} has already been registered at the resource manager.", resourceID);
}
else {
@@ -549,8 +546,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
try {
workerStore.setFrameworkID(Option.apply(message.frameworkId()));
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
fatalError("unable to store the assigned framework ID", ex);
return;
}
@@ -598,13 +594,12 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
boolean existed;
try {
existed = workerStore.removeWorker(taskID);
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
fatalError("unable to remove worker", ex);
return;
}
- if(!existed) {
+ if (!existed) {
LOG.info("Received a termination notice for an unrecognized worker: {}", id);
return;
}
@@ -695,11 +690,15 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
* @return goal state information for the {@Link TaskMonitor}.
*/
static TaskMonitor.TaskGoalState extractGoalState(MesosWorkerStore.Worker worker) {
- switch(worker.state()) {
- case New: return new TaskMonitor.New(worker.taskID());
- case Launched: return new TaskMonitor.Launched(worker.taskID(), worker.slaveID().get());
- case Released: return new TaskMonitor.Released(worker.taskID(), worker.slaveID().get());
- default: throw new IllegalArgumentException("unsupported worker state");
+ switch (worker.state()) {
+ case New:
+ return new TaskMonitor.New(worker.taskID());
+ case Launched:
+ return new TaskMonitor.Launched(worker.taskID(), worker.slaveID().get());
+ case Released:
+ return new TaskMonitor.Released(worker.taskID(), worker.slaveID().get());
+ default:
+ throw new IllegalArgumentException("unsupported worker state");
}
}
@@ -727,7 +726,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
/**
* Creates the props needed to instantiate this actor.
*
- * Rather than extracting and validating parameters in the constructor, this factory method takes
+ * <p>Rather than extracting and validating parameters in the constructor, this factory method takes
* care of that. That way, errors occur synchronously, and are not swallowed simply in a
* failed asynchronous attempt to start the actor.
@@ -746,7 +745,8 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
*
* @return The Props object to instantiate the MesosFlinkResourceManager actor.
*/
- public static Props createActorProps(Class<? extends MesosFlinkResourceManager> actorClass,
+ public static Props createActorProps(
+ Class<? extends MesosFlinkResourceManager> actorClass,
Configuration flinkConfig,
MesosConfiguration mesosConfig,
MesosWorkerStore workerStore,
@@ -754,8 +754,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
MesosTaskManagerParameters taskManagerParameters,
ContainerSpecification taskManagerContainerSpec,
MesosArtifactResolver artifactResolver,
- Logger log)
- {
+ Logger log) {
final int numInitialTaskManagers = flinkConfig.getInteger(
ConfigConstants.MESOS_INITIAL_TASKS, 0);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 4324469..f5a415e 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -18,34 +18,36 @@
package org.apache.flink.mesos.runtime.clusterframework;
-import com.netflix.fenzo.ConstraintEvaluator;
-import com.netflix.fenzo.functions.Func1;
-import com.netflix.fenzo.plugins.HostAttrValueConstraint;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.util.Preconditions;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.functions.Func1;
+import com.netflix.fenzo.plugins.HostAttrValueConstraint;
import org.apache.mesos.Protos;
-import scala.Option;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;
+import scala.Option;
+
import static org.apache.flink.configuration.ConfigOptions.key;
/**
* This class describes the Mesos-specific parameters for launching a TaskManager process.
*
- * These parameters are in addition to the common parameters
+ * <p>These parameters are in addition to the common parameters
* provided by {@link ContaineredTaskManagerParameters}.
*/
public class MesosTaskManagerParameters {
- /** Pattern replaced in the {@link #MESOS_TM_HOSTNAME} by the actual task id of the Mesos task */
+ /** Pattern replaced in the {@link #MESOS_TM_HOSTNAME} by the actual task id of the Mesos task. */
public static final Pattern TASK_ID_PATTERN = Pattern.compile("_TASK_", Pattern.LITERAL);
public static final ConfigOption<Integer> MESOS_RM_TASKS_SLOTS =
@@ -75,11 +77,11 @@ public class MesosTaskManagerParameters {
public static final ConfigOption<String> MESOS_TM_BOOTSTRAP_CMD =
key("mesos.resourcemanager.tasks.bootstrap-cmd")
.noDefaultValue();
-
+
public static final ConfigOption<String> MESOS_RM_CONTAINER_VOLUMES =
key("mesos.resourcemanager.tasks.container.volumes")
.noDefaultValue();
-
+
public static final ConfigOption<String> MESOS_CONSTRAINTS_HARD_HOSTATTR =
key("mesos.constraints.hard.hostattribute")
.noDefaultValue();
@@ -102,9 +104,9 @@ public class MesosTaskManagerParameters {
private final ContaineredTaskManagerParameters containeredParameters;
private final List<Protos.Volume> containerVolumes;
-
+
private final List<ConstraintEvaluator> constraints;
-
+
private final Option<String> bootstrapCommand;
private final Option<String> taskManagerHostname;
@@ -129,10 +131,9 @@ public class MesosTaskManagerParameters {
this.taskManagerHostname = Preconditions.checkNotNull(taskManagerHostname);
}
-
- /**
+ /**
* Get the CPU units to use for the TaskManager process.
- */
+ */
public double cpus() {
return cpus;
}
@@ -140,49 +141,53 @@ public class MesosTaskManagerParameters {
/**
* Get the container type (Mesos or Docker). The default is Mesos.
*
- * Mesos provides a facility for a framework to specify which containerizer to use.
- */
+ * <p>Mesos provides a facility for a framework to specify which containerizer to use.
+ */
public ContainerType containerType() {
return containerType;
}
/**
* Get the container image name.
- */
+ */
public Option<String> containerImageName() {
return containerImageName;
}
/**
* Get the common containered parameters.
- */
+ */
public ContaineredTaskManagerParameters containeredParameters() {
return containeredParameters;
}
/**
- * Get the container volumes string
+ * Get the container volumes string.
*/
public List<Protos.Volume> containerVolumes() {
return containerVolumes;
}
/**
- * Get the placement constraints
+ * Get the placement constraints.
*/
public List<ConstraintEvaluator> constraints() {
return constraints;
}
/**
- * Get the taskManager hostname.
- */
- public Option<String> getTaskManagerHostname() { return taskManagerHostname; }
+ * Get the taskManager hostname.
+ */
+ public Option<String> getTaskManagerHostname() {
+ return taskManagerHostname;
+ }
/**
- * Get the bootstrap command.
- */
- public Option<String> bootstrapCommand() { return bootstrapCommand; }
+ * Get the bootstrap command.
+ */
+ public Option<String> bootstrapCommand() {
+ return bootstrapCommand;
+ }
@Override
public String toString() {
@@ -200,8 +205,9 @@ public class MesosTaskManagerParameters {
/**
* Create the Mesos TaskManager parameters.
+ *
* @param flinkConfig the TM configuration.
- */
+ */
public static MesosTaskManagerParameters create(Configuration flinkConfig) {
List<ConstraintEvaluator> constraints = parseConstraints(flinkConfig.getString(MESOS_CONSTRAINTS_HARD_HOSTATTR));
@@ -212,7 +218,7 @@ public class MesosTaskManagerParameters {
flinkConfig.getInteger(MESOS_RM_TASKS_SLOTS));
double cpus = flinkConfig.getDouble(MESOS_RM_TASKS_CPUS);
- if(cpus <= 0.0) {
+ if (cpus <= 0.0) {
cpus = Math.max(containeredParameters.numSlots(), 1.0);
}
@@ -221,13 +227,13 @@ public class MesosTaskManagerParameters {
ContainerType containerType;
String containerTypeString = flinkConfig.getString(MESOS_RM_CONTAINER_TYPE);
- switch(containerTypeString) {
+ switch (containerTypeString) {
case MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS:
containerType = ContainerType.MESOS;
break;
case MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER:
containerType = ContainerType.DOCKER;
- if(imageName == null || imageName.length() == 0) {
+ if (imageName == null || imageName.length() == 0) {
throw new IllegalConfigurationException(MESOS_RM_CONTAINER_IMAGE_NAME.key() +
" must be specified for docker container type");
}
@@ -250,7 +256,7 @@ public class MesosTaskManagerParameters {
cpus,
containerType,
Option.apply(imageName),
- containeredParameters,
+ containeredParameters,
containerVolumes,
constraints,
tmBootstrapCommand,
@@ -287,7 +293,7 @@ public class MesosTaskManagerParameters {
}
}));
}
-
+
/**
* Used to build volume specs for mesos. This allows for mounting additional volumes into a container
*
@@ -341,6 +347,9 @@ public class MesosTaskManagerParameters {
}
}
+ /**
+ * The supported containerizers.
+ */
public enum ContainerType {
MESOS,
DOCKER
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index 625880b..e1b0efa 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -18,14 +18,6 @@
package org.apache.flink.mesos.runtime.clusterframework;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -36,14 +28,21 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
-
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.util.Preconditions;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
/**
* The entry point for running a TaskManager in a Mesos container.
*/
@@ -59,7 +58,7 @@ public class MesosTaskManagerRunner {
.addOption(BootstrapTools.newDynamicPropertiesOption());
}
- /** The process environment variables */
+ /** The process environment variables. */
private static final Map<String, String> ENV = System.getenv();
public static void runTaskManager(String[] args, final Class<? extends TaskManager> taskManager) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
index 0ac5f4e..a28020a 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
@@ -24,6 +24,9 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.zookeeper.ZooKeeperUtilityFactory;
import org.apache.flink.util.ConfigurationUtil;
+/**
+ * Utilities for the {@link MesosServices}.
+ */
public class MesosServicesUtils {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
index dfbc2c3..aa3157f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
@@ -35,5 +35,6 @@ public class StandaloneMesosServices implements MesosServices {
}
@Override
- public void close(boolean cleanup) throws Exception {}
+ public void close(boolean cleanup) throws Exception {
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
index d6ff6bc..f1f54ce 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
@@ -19,13 +19,14 @@
package org.apache.flink.mesos.runtime.clusterframework.store;
import org.apache.mesos.Protos;
-import scala.Option;
import java.io.Serializable;
import java.text.DecimalFormat;
import java.util.List;
import java.util.Objects;
+import scala.Option;
+
import static java.util.Objects.requireNonNull;
/**
@@ -83,7 +84,7 @@ public interface MesosWorkerStore {
/**
* A stored worker.
*
- * The assigned slaveid/hostname is valid in Launched and Released states. The hostname is needed
+ * <p>The assigned slaveid/hostname is valid in Launched and Released states. The hostname is needed
* by Fenzo for optimization purposes.
*/
class Worker implements Serializable {
@@ -112,28 +113,28 @@ public interface MesosWorkerStore {
/**
* Get the worker's task ID.
- */
+ */
public Protos.TaskID taskID() {
return taskID;
}
/**
* Get the worker's assigned slave ID.
- */
+ */
public Option<Protos.SlaveID> slaveID() {
return slaveID;
}
/**
* Get the worker's assigned hostname.
- */
+ */
public Option<String> hostname() {
return hostname;
}
/**
* Get the worker's state.
- */
+ */
public WorkerState state() {
return state;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java
index c6ccbee..b43b89c 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java
@@ -19,13 +19,14 @@
package org.apache.flink.mesos.runtime.clusterframework.store;
import org.apache.mesos.Protos;
-import scala.Option;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import scala.Option;
+
/**
* A standalone Mesos worker store.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
index 4544b8e..92e4416 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
@@ -26,11 +26,11 @@ import org.apache.flink.runtime.zookeeper.ZooKeeperSharedValue;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.runtime.zookeeper.ZooKeeperVersionedValue;
import org.apache.flink.util.FlinkException;
+
import org.apache.mesos.Protos;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
import java.io.IOException;
import java.util.ArrayList;
@@ -38,6 +38,8 @@ import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.List;
+import scala.Option;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -53,13 +55,13 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
/** Flag indicating whether this instance is running. */
private boolean isRunning;
- /** A persistent value of the assigned framework ID */
+ /** A persistent value of the assigned framework ID. */
private final ZooKeeperSharedValue frameworkIdInZooKeeper;
- /** A persistent count of all tasks created, for generating unique IDs */
+ /** A persistent count of all tasks created, for generating unique IDs. */
private final ZooKeeperSharedCount totalTaskCountInZooKeeper;
- /** A persistent store of serialized workers */
+ /** A persistent store of serialized workers. */
private final ZooKeeperStateHandleStore<MesosWorkerStore.Worker> workersInZooKeeper;
@SuppressWarnings("unchecked")
@@ -69,7 +71,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
ZooKeeperSharedCount totalTaskCountInZooKeeper) throws Exception {
this.workersInZooKeeper = checkNotNull(workersInZooKeeper, "workersInZooKeeper");
this.frameworkIdInZooKeeper = checkNotNull(frameworkIdInZooKeeper, "frameworkIdInZooKeeper");
- this.totalTaskCountInZooKeeper= checkNotNull(totalTaskCountInZooKeeper, "totalTaskCountInZooKeeper");
+ this.totalTaskCountInZooKeeper = checkNotNull(totalTaskCountInZooKeeper, "totalTaskCountInZooKeeper");
}
@Override
@@ -89,7 +91,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
frameworkIdInZooKeeper.close();
totalTaskCountInZooKeeper.close();
- if(cleanup) {
+ if (cleanup) {
workersInZooKeeper.releaseAndTryRemoveAll();
}
@@ -237,7 +239,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
synchronized (startStopLock) {
verifyIsRunning();
- if(workersInZooKeeper.exists(path) == -1) {
+ if (workersInZooKeeper.exists(path) == -1) {
LOG.debug("No such worker {} in ZooKeeper.", taskID);
return false;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java
index b6d3383..0ac05c1 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java
@@ -18,8 +18,6 @@
package org.apache.flink.mesos.scheduler;
-import akka.actor.ActorRef;
-
import org.apache.flink.mesos.scheduler.messages.Disconnected;
import org.apache.flink.mesos.scheduler.messages.Error;
import org.apache.flink.mesos.scheduler.messages.ExecutorLost;
@@ -30,6 +28,8 @@ import org.apache.flink.mesos.scheduler.messages.Registered;
import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
import org.apache.flink.mesos.scheduler.messages.SlaveLost;
import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+
+import akka.actor.ActorRef;
import org.apache.mesos.Protos;
import org.apache.mesos.Scheduler;
import org.apache.mesos.SchedulerDriver;
@@ -39,14 +39,14 @@ import java.util.List;
/**
* This class reacts to callbacks from the Mesos scheduler driver.
*
- * In order to preserve actor concurrency safety, this class simply sends
+ * <p>In order to preserve actor concurrency safety, this class simply sends
* corresponding messages to the Mesos resource master actor.
*
- * See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html
+ * <p>See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html
*/
public class SchedulerProxy implements Scheduler {
- /** The actor to which we report the callbacks */
+ /** The actor to which we report the callbacks. */
private final ActorRef mesosActor;
public SchedulerProxy(ActorRef mesosActor) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java
index c841e22..a5653e1 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java
@@ -25,7 +25,7 @@ import com.netflix.fenzo.functions.Action1;
/**
* A builder for the Fenzo task scheduler.
*
- * Note that the Fenzo-provided {@link TaskScheduler.Builder} cannot be mocked, which motivates this interface.
+ * <p>Note that the Fenzo-provided {@link TaskScheduler.Builder} cannot be mocked, which motivates this interface.
*/
public interface TaskSchedulerBuilder {
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/ResourceOffers.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/ResourceOffers.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/ResourceOffers.java
index dc5111d..8ca7e42 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/ResourceOffers.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/ResourceOffers.java
@@ -22,6 +22,7 @@ import org.apache.mesos.Protos;
import java.io.Serializable;
import java.util.List;
+
import static java.util.Objects.requireNonNull;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
index a6a26dc..c62ab84 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
@@ -19,10 +19,11 @@
package org.apache.flink.mesos.util;
import org.apache.flink.core.fs.Path;
-import scala.Option;
import java.net.URL;
+import scala.Option;
+
/**
* An interface for resolving artifact URIs.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index ae826db..967d818 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -18,6 +18,14 @@
package org.apache.flink.mesos.util;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.net.SSLUtils;
+
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
@@ -42,25 +50,17 @@ import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.router.Handler;
import io.netty.handler.codec.http.router.Routed;
import io.netty.handler.codec.http.router.Router;
+import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedStream;
-
import io.netty.handler.stream.ChunkedWriteHandler;
-import io.netty.handler.ssl.SslHandler;
import io.netty.util.CharsetUtil;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.net.SSLUtils;
import org.jets3t.service.utils.Mimetypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
+
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -69,6 +69,8 @@ import java.net.URL;
import java.util.HashMap;
import java.util.Map;
+import scala.Option;
+
import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
@@ -81,11 +83,10 @@ import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
/**
* A generic Mesos artifact server, designed specifically for use by the Mesos Fetcher.
*
- * More information:
+ * <p>More information:
* http://mesos.apache.org/documentation/latest/fetcher/
* http://mesos.apache.org/documentation/latest/fetcher-cache-internals/
*/
@@ -101,7 +102,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
private final URL baseURL;
- private final Map<Path,URL> paths = new HashMap<>();
+ private final Map<Path, URL> paths = new HashMap<>();
private final SSLContext serverSSLContext;
@@ -153,7 +154,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
}
};
- NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
+ NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
this.bootstrap = new ServerBootstrap();
@@ -169,7 +170,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
String address = bindAddress.getAddress().getHostAddress();
int port = bindAddress.getPort();
- String httpProtocol = (serverSSLContext != null) ? "https": "http";
+ String httpProtocol = (serverSSLContext != null) ? "https" : "http";
baseURL = new URL(httpProtocol, serverHostname, port, "/" + prefix + "/");
@@ -201,7 +202,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
* @param remoteFile the remote path with which to locate the file.
* @return the fully-qualified remote path to the file.
* @throws MalformedURLException if the remote path is invalid.
- */
+ */
public synchronized URL addFile(File localFile, String remoteFile) throws IOException, MalformedURLException {
return addPath(new Path(localFile.toURI()), new Path(remoteFile));
}
@@ -214,10 +215,10 @@ public class MesosArtifactServer implements MesosArtifactResolver {
* @throws MalformedURLException if the remote path is invalid.
*/
public synchronized URL addPath(Path path, Path remoteFile) throws IOException, MalformedURLException {
- if(paths.containsKey(remoteFile)) {
+ if (paths.containsKey(remoteFile)) {
throw new IllegalArgumentException("duplicate path registered");
}
- if(remoteFile.isAbsolute()) {
+ if (remoteFile.isAbsolute()) {
throw new IllegalArgumentException("not expecting an absolute path");
}
URL fileURL = new URL(baseURL, remoteFile.toString());
@@ -229,7 +230,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
}
public synchronized void removePath(Path remoteFile) {
- if(paths.containsKey(remoteFile)) {
+ if (paths.containsKey(remoteFile)) {
URL fileURL = null;
try {
fileURL = new URL(baseURL, remoteFile.toString());
@@ -274,11 +275,11 @@ public class MesosArtifactServer implements MesosArtifactResolver {
public VirtualFileServerHandler(Path path) throws IOException {
this.path = path;
- if(!path.isAbsolute()) {
+ if (!path.isAbsolute()) {
throw new IllegalArgumentException("path must be absolute: " + path.toString());
}
this.fs = path.getFileSystem();
- if(!fs.exists(path) || fs.getFileStatus(path).isDir()) {
+ if (!fs.exists(path) || fs.getFileStatus(path).isDir()) {
throw new IllegalArgumentException("no such file: " + path.toString());
}
}
@@ -292,12 +293,11 @@ public class MesosArtifactServer implements MesosArtifactResolver {
LOG.debug("{} request for file '{}'", request.getMethod(), path);
}
- if(!(request.getMethod() == GET || request.getMethod() == HEAD)) {
+ if (!(request.getMethod() == GET || request.getMethod() == HEAD)) {
sendMethodNotAllowed(ctx);
return;
}
-
final FileStatus status;
try {
status = fs.getFileStatus(path);
@@ -322,8 +322,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
final FSDataInputStream stream = fs.open(path);
try {
ctx.write(new ChunkedStream(stream));
- }
- catch(Exception e) {
+ } catch (Exception e) {
stream.close();
throw e;
}
@@ -369,7 +368,6 @@ public class MesosArtifactServer implements MesosArtifactResolver {
}
}
-
/**
* Handle a request for a non-existent file.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java
index 40dc41c..7660e9c 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java
@@ -23,10 +23,11 @@ import org.apache.mesos.Protos;
import org.apache.mesos.Scheduler;
import org.apache.mesos.SchedulerDriver;
import org.slf4j.Logger;
-import scala.Option;
import java.util.Map;
+import scala.Option;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -53,7 +54,7 @@ public class MesosConfiguration {
/**
* The Mesos connection string.
*
- * The value should be in one of the following forms:
+ * <p>The value should be in one of the following forms:
* <pre>
* {@code
* host:port
@@ -62,28 +63,28 @@ public class MesosConfiguration {
* file:///path/to/file (where file contains one of the above)
* }
* </pre>
- */
+ */
public String masterUrl() {
return masterUrl;
}
/**
* The framework registration info.
- */
+ */
public Protos.FrameworkInfo.Builder frameworkInfo() {
return frameworkInfo;
}
/**
* The credential to authenticate the framework principal.
- */
+ */
public Option<Protos.Credential.Builder> credential() {
return credential;
}
/**
* Revise the configuration with updated framework info.
- */
+ */
public MesosConfiguration withFrameworkInfo(Protos.FrameworkInfo.Builder frameworkInfo) {
return new MesosConfiguration(masterUrl, frameworkInfo, credential);
}
@@ -92,11 +93,11 @@ public class MesosConfiguration {
* Create the Mesos scheduler driver based on this configuration.
* @param scheduler the scheduler to use.
* @param implicitAcknowledgements whether to configure the driver for implicit acknowledgements.
- * @return a scheduler driver.
- */
+ * @return a scheduler driver.
+ */
public SchedulerDriver createDriver(Scheduler scheduler, boolean implicitAcknowledgements) {
MesosSchedulerDriver schedulerDriver;
- if(this.credential().isDefined()) {
+ if (this.credential().isDefined()) {
schedulerDriver =
new MesosSchedulerDriver(scheduler, frameworkInfo.build(), this.masterUrl(),
implicitAcknowledgements, this.credential().get().build());
@@ -119,11 +120,11 @@ public class MesosConfiguration {
}
/**
- * A utility method to log relevant Mesos connection info
- */
+ * A utility method to log relevant Mesos connection info.
+ */
public static void logMesosConfig(Logger log, MesosConfiguration config) {
- Map<String,String> env = System.getenv();
+ Map<String, String> env = System.getenv();
Protos.FrameworkInfo.Builder info = config.frameworkInfo();
log.info("--------------------------------------------------------------------------------");
@@ -137,10 +138,10 @@ public class MesosConfiguration {
log.info(" Role: {}", info.hasRole() ? info.getRole() : "(none)");
log.info(" Principal: {}", info.hasPrincipal() ? info.getPrincipal() : "(none)");
log.info(" Host: {}", info.hasHostname() ? info.getHostname() : "(none)");
- if(env.containsKey("LIBPROCESS_IP")) {
+ if (env.containsKey("LIBPROCESS_IP")) {
log.info(" LIBPROCESS_IP: {}", env.get("LIBPROCESS_IP"));
}
- if(env.containsKey("LIBPROCESS_PORT")) {
+ if (env.containsKey("LIBPROCESS_PORT")) {
log.info(" LIBPROCESS_PORT: {}", env.get("LIBPROCESS_PORT"));
}
log.info(" Web UI: {}", info.hasWebuiUrl() ? info.getWebuiUrl() : "(none)");
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java
index 6892a65..4211642 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java
@@ -18,9 +18,13 @@
package org.apache.flink.mesos.util;
-import org.apache.curator.framework.CuratorFramework;
import org.apache.flink.configuration.Configuration;
+import org.apache.curator.framework.CuratorFramework;
+
+/**
+ * Utilities for Zookeeper.
+ */
public class ZooKeeperUtils {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index 7ab4e40..af3f7ef 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -18,29 +18,33 @@
package org.apache.flink.mesos.runtime.clusterframework;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-import akka.testkit.TestProbe;
-import com.netflix.fenzo.ConstraintEvaluator;
-import junit.framework.AssertionFailedError;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.scheduler.ConnectionMonitor;
import org.apache.flink.mesos.scheduler.LaunchCoordinator;
import org.apache.flink.mesos.scheduler.TaskMonitor;
-import org.apache.flink.mesos.scheduler.messages.*;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
import org.apache.flink.mesos.util.MesosArtifactResolver;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import org.apache.flink.runtime.clusterframework.messages.*;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
+import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
+import org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -50,27 +54,43 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
-import org.apache.mesos.SchedulerDriver;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import akka.testkit.TestProbe;
+import com.netflix.fenzo.ConstraintEvaluator;
+import junit.framework.AssertionFailedError;
import org.apache.mesos.Protos;
import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
-import static java.util.Collections.singletonList;
+import scala.Option;
+import static java.util.Collections.singletonList;
import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractGoalState;
import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractResourceID;
-import static org.hamcrest.Matchers.*;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* General tests for the Mesos resource manager component.
@@ -125,13 +145,24 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
}
@Override
- protected ActorRef createConnectionMonitor() { return connectionMonitor.ref(); }
+ protected ActorRef createConnectionMonitor() {
+ return connectionMonitor.ref();
+ }
+
@Override
- protected ActorRef createTaskRouter() { return taskRouter.ref(); }
+ protected ActorRef createTaskRouter() {
+ return taskRouter.ref();
+ }
+
@Override
- protected ActorRef createLaunchCoordinator() { return launchCoordinator.ref(); }
+ protected ActorRef createLaunchCoordinator() {
+ return launchCoordinator.ref();
+ }
+
@Override
- protected ActorRef createReconciliationCoordinator() { return reconciliationCoordinator.ref(); }
+ protected ActorRef createReconciliationCoordinator() {
+ return reconciliationCoordinator.ref();
+ }
@Override
protected void fatalError(String message, Throwable error) {
@@ -214,11 +245,11 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
ContaineredTaskManagerParameters containeredParams =
new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
- 1.0,
- MesosTaskManagerParameters.ContainerType.MESOS,
- Option.<String>empty(),
- containeredParams,
- Collections.<Protos.Volume>emptyList(),
+ 1.0,
+ MesosTaskManagerParameters.ContainerType.MESOS,
+ Option.<String>empty(),
+ containeredParams,
+ Collections.<Protos.Volume>emptyList(),
Collections.<ConstraintEvaluator>emptyList(),
Option.<String>empty(),
Option.<String>empty());
@@ -301,8 +332,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
resourceManagerInstance.launchCoordinator.expectMsgClass(LaunchCoordinator.Launch.class);
register(Collections.<ResourceID>emptyList());
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -331,8 +361,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
assertThat(resourceManagerInstance.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched));
register(singletonList(extractResourceID(task1)));
assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -364,8 +393,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
// verify that the internal state was updated
assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -409,8 +437,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
// verify that the instance state was updated
assertThat(resourceManagerInstance.workersBeingReturned.entrySet(), empty());
verify(workerStore).removeWorker(task1);
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -442,8 +469,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
assertThat(resourceManagerInstance.workersInNew, hasEntry(extractResourceID(task1), expected));
resourceManagerInstance.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class);
resourceManagerInstance.launchCoordinator.expectMsgClass(LaunchCoordinator.Launch.class);
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -508,8 +534,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
resourceManagerInstance.taskRouter.expectMsg(
new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker1launched)));
verify(schedulerDriver).acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -567,8 +592,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
// verify that the instance state was updated
assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
verify(workerStore).newTaskID();
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -604,8 +628,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
expectMsgClass(ResourceRemoved.class);
verify(workerStore).newTaskID();
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -632,8 +655,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
verify(schedulerDriver).stop(false);
verify(workerStore).stop(true);
expectTerminated(resourceManager.actor());
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -665,8 +687,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
resourceManagerInstance.reconciliationCoordinator.expectMsgClass(Registered.class);
resourceManagerInstance.launchCoordinator.expectMsgClass(Registered.class);
resourceManagerInstance.taskRouter.expectMsgClass(Registered.class);
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -697,8 +718,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
resourceManagerInstance.reconciliationCoordinator.expectMsgClass(ReRegistered.class);
resourceManagerInstance.launchCoordinator.expectMsgClass(ReRegistered.class);
resourceManagerInstance.taskRouter.expectMsgClass(ReRegistered.class);
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -726,8 +746,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
resourceManagerInstance.reconciliationCoordinator.expectMsgClass(Disconnected.class);
resourceManagerInstance.launchCoordinator.expectMsgClass(Disconnected.class);
resourceManagerInstance.taskRouter.expectMsgClass(Disconnected.class);
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -752,8 +771,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
watch(resourceManager.actor());
resourceManager.tell(new Error("test"), resourceManager);
expectTerminated(resourceManager.actor());
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
[3/5] flink git commit: [FLINK-6642] Return -1 in
EnvInfo#getOpenFileHandlesLimit
Posted by ch...@apache.org.
[FLINK-6642] Return -1 in EnvInfo#getOpenFileHandlesLimit
This closes #3956.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d4fba3b6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d4fba3b6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d4fba3b6
Branch: refs/heads/master
Commit: d4fba3b674fc67db198e4cf163de6ca44b5caf8e
Parents: 0bca76e
Author: zentol <ch...@apache.org>
Authored: Fri May 19 17:22:26 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 25 15:56:43 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/runtime/util/EnvironmentInformation.java | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d4fba3b6/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
index 39fa80e..7d44f74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
@@ -25,6 +25,7 @@ import java.lang.reflect.Method;
import java.util.List;
import java.util.Properties;
+import org.apache.flink.util.OperatingSystem;
import org.apache.hadoop.util.VersionInfo;
import org.slf4j.Logger;
@@ -232,6 +233,9 @@ public class EnvironmentInformation {
* @return The limit of open file handles, or {@code -1}, if the limit could not be determined.
*/
public static long getOpenFileHandlesLimit() {
+ if (OperatingSystem.isWindows()) { // getMaxFileDescriptorCount method is not available on Windows
+ return -1L;
+ }
Class<?> sunBeanClass;
try {
sunBeanClass = Class.forName("com.sun.management.UnixOperatingSystemMXBean");
[5/5] flink git commit: [FLINK-5892] Enable 1.2 keyed state test
Posted by ch...@apache.org.
[FLINK-5892] Enable 1.2 keyed state test
This closes #3842.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79b86949
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79b86949
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79b86949
Branch: refs/heads/master
Commit: 79b869498c82c83baf44465aff5acac3bd948bf5
Parents: 8dfc9f9
Author: zentol <ch...@apache.org>
Authored: Mon May 8 11:56:57 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 25 16:00:32 2017 +0200
----------------------------------------------------------------------
.../state/operator/restore/keyed/KeyedJob.java | 14 ++++++--------
.../operatorstate/complexKeyed/_metadata | Bin 137490 -> 134953 bytes
2 files changed, 6 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/79b86949/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
index 6add7b2..3c28c3b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
@@ -97,10 +97,9 @@ public class KeyedJob {
.map(new StatefulStringStoringMap(mode, "first"))
.setParallelism(4);
- // TODO: re-enable this when generating the actual 1.2 savepoint
- //if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
- map.uid("first");
- //}
+ if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
+ map.uid("first");
+ }
return map;
}
@@ -110,10 +109,9 @@ public class KeyedJob {
.map(new StatefulStringStoringMap(mode, "second"))
.setParallelism(4);
- // TODO: re-enable this when generating the actual 1.2 savepoint
- //if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
- map.uid("second");
- //}
+ if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
+ map.uid("second");
+ }
return map;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/79b86949/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata b/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata
index 9e03876..0a1ed10 100644
Binary files a/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata and b/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata differ