You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/21 12:22:08 UTC
[28/50] [abbrv] flink git commit: [FLINK-4738] [TaskManager] Port
TaskManager logic to new Flip-6 TaskManager
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/CheckpointException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/CheckpointException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/CheckpointException.java
new file mode 100644
index 0000000..80f2aa0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/CheckpointException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+/**
+ * Exception indicating a problem with checkpointing on the {@link TaskExecutor} side.
+ */
+public class CheckpointException extends TaskManagerException {
+
+ private static final long serialVersionUID = 3366394086880327955L;
+
+ public CheckpointException(String message) {
+ super(message);
+ }
+
+ public CheckpointException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public CheckpointException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionException.java
new file mode 100644
index 0000000..eecd0ae
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+/**
+ * Exception indicating a problem with the result partitions on the {@link TaskExecutor} side.
+ */
+public class PartitionException extends TaskManagerException {
+
+ private static final long serialVersionUID = 6248696963418276618L;
+
+ public PartitionException(String message) {
+ super(message);
+ }
+
+ public PartitionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public PartitionException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskException.java
new file mode 100644
index 0000000..a4a89c2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+/**
+ * Exception indicating a task related problem on the {@link TaskExecutor}.
+ */
+public class TaskException extends TaskManagerException {
+
+ private static final long serialVersionUID = 968001398103156856L;
+
+ public TaskException(String message) {
+ super(message);
+ }
+
+ public TaskException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public TaskException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskManagerException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskManagerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskManagerException.java
new file mode 100644
index 0000000..62d186e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskManagerException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+/**
+ * Base exception thrown by the {@link TaskExecutor}.
+ */
+public class TaskManagerException extends Exception {
+
+ private static final long serialVersionUID = -2997745772227694731L;
+
+ public TaskManagerException(String message) {
+ super(message);
+ }
+
+ public TaskManagerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public TaskManagerException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskSubmissionException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskSubmissionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskSubmissionException.java
new file mode 100644
index 0000000..23f7812
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskSubmissionException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+import org.apache.flink.runtime.taskmanager.Task;
+
+/**
+ * Exception indicating a problem with the {@link Task} submission at the {@link TaskException}.
+ */
+public class TaskSubmissionException extends TaskManagerException {
+
+ private static final long serialVersionUID = 4589813591317690486L;
+
+ public TaskSubmissionException(String message) {
+ super(message);
+ }
+
+ public TaskSubmissionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public TaskSubmissionException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
new file mode 100644
index 0000000..246c11d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.rpc;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.util.Preconditions;
+
+public class RpcCheckpointResponder implements CheckpointResponder {
+
+ private final CheckpointCoordinatorGateway checkpointCoordinatorGateway;
+
+ public RpcCheckpointResponder(CheckpointCoordinatorGateway checkpointCoordinatorGateway) {
+ this.checkpointCoordinatorGateway = Preconditions.checkNotNull(checkpointCoordinatorGateway);
+ }
+
+ @Override
+ public void acknowledgeCheckpoint(
+ JobID jobID,
+ ExecutionAttemptID executionAttemptID,
+ CheckpointMetaData checkpointMetaData,
+ CheckpointStateHandles checkpointStateHandles) {
+
+ checkpointCoordinatorGateway.acknowledgeCheckpoint(
+ jobID,
+ executionAttemptID,
+ checkpointMetaData,
+ checkpointStateHandles);
+
+ }
+
+ @Override
+ public void declineCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, CheckpointMetaData checkpoint) {
+ checkpointCoordinatorGateway.declineCheckpoint(jobID, executionAttemptID, checkpoint);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
new file mode 100644
index 0000000..4850d63
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.rpc;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+public class RpcInputSplitProvider implements InputSplitProvider {
+ private final JobMasterGateway jobMasterGateway;
+ private final JobID jobID;
+ private final JobVertexID jobVertexID;
+ private final ExecutionAttemptID executionAttemptID;
+ private final Time timeout;
+
+ public RpcInputSplitProvider(
+ JobMasterGateway jobMasterGateway,
+ JobID jobID,
+ JobVertexID jobVertexID,
+ ExecutionAttemptID executionAttemptID,
+ Time timeout) {
+ this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
+ this.jobID = Preconditions.checkNotNull(jobID);
+ this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
+ this.executionAttemptID = Preconditions.checkNotNull(executionAttemptID);
+ this.timeout = Preconditions.checkNotNull(timeout);
+ }
+
+
+ @Override
+ public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {
+ Preconditions.checkNotNull(userCodeClassLoader);
+
+ Future<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(jobVertexID, executionAttemptID);
+
+ try {
+ SerializedInputSplit serializedInputSplit = futureInputSplit.get(timeout.getSize(), timeout.getUnit());
+
+ if (serializedInputSplit.isEmpty()) {
+ return null;
+ } else {
+ return InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(), userCodeClassLoader);
+ }
+ } catch (Exception e) {
+ throw new InputSplitProviderException("Requesting the next input split failed.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java
new file mode 100644
index 0000000..3692a71
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.rpc;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateRegistryGateway;
+import org.apache.flink.runtime.query.KvStateRegistryListener;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.util.Preconditions;
+
+public class RpcKvStateRegistryListener implements KvStateRegistryListener {
+
+ private final KvStateRegistryGateway kvStateRegistryGateway;
+ private final KvStateServerAddress kvStateServerAddress;
+
+ public RpcKvStateRegistryListener(
+ KvStateRegistryGateway kvStateRegistryGateway,
+ KvStateServerAddress kvStateServerAddress) {
+ this.kvStateRegistryGateway = Preconditions.checkNotNull(kvStateRegistryGateway);
+ this.kvStateServerAddress = Preconditions.checkNotNull(kvStateServerAddress);
+ }
+
+ @Override
+ public void notifyKvStateRegistered(
+ JobID jobId,
+ JobVertexID jobVertexId,
+ KeyGroupRange keyGroupRange,
+ String registrationName,
+ KvStateID kvStateId) {
+ kvStateRegistryGateway.notifyKvStateRegistered(
+ jobId,
+ jobVertexId,
+ keyGroupRange,
+ registrationName,
+ kvStateId,
+ kvStateServerAddress);
+
+ }
+
+ @Override
+ public void notifyKvStateUnregistered(
+ JobID jobId,
+ JobVertexID jobVertexId,
+ KeyGroupRange keyGroupRange,
+ String registrationName) {
+
+ kvStateRegistryGateway.notifyKvStateUnregistered(
+ jobId,
+ jobVertexId,
+ keyGroupRange,
+ registrationName);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
new file mode 100644
index 0000000..ab111ad
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.rpc;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.PartitionState;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.util.Preconditions;
+
+public class RpcPartitionStateChecker implements PartitionStateChecker {
+
+ private final JobMasterGateway jobMasterGateway;
+
+ public RpcPartitionStateChecker(JobMasterGateway jobMasterGateway) {
+ this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
+ }
+
+ @Override
+ public Future<PartitionState> requestPartitionState(
+ JobID jobId,
+ ExecutionAttemptID executionId,
+ IntermediateDataSetID resultId,
+ ResultPartitionID partitionId) {
+
+ return jobMasterGateway.requestPartitionState(partitionId, executionId, resultId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
new file mode 100644
index 0000000..29ad3b6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.rpc;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executor;
+
+public class RpcResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RpcResultPartitionConsumableNotifier.class);
+
+ private final JobMasterGateway jobMasterGateway;
+ private final Executor executor;
+ private final Time timeout;
+
+ public RpcResultPartitionConsumableNotifier(
+ JobMasterGateway jobMasterGateway,
+ Executor executor,
+ Time timeout) {
+ this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
+ this.executor = Preconditions.checkNotNull(executor);
+ this.timeout = Preconditions.checkNotNull(timeout);
+ }
+ @Override
+ public void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, final TaskActions taskActions) {
+ Future<Acknowledge> acknowledgeFuture = jobMasterGateway.scheduleOrUpdateConsumers(partitionId, timeout);
+
+ acknowledgeFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
+ @Override
+ public Void apply(Throwable value) {
+ LOG.error("Could not schedule or update consumers at the JobManager.", value);
+
+ taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers.", value));
+
+ return null;
+ }
+ }, executor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java
new file mode 100644
index 0000000..1f8d5ed
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.utils;
+
+import com.sun.management.OperatingSystemMXBean;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+import java.lang.management.ClassLoadingMXBean;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.ThreadMXBean;
+import java.util.List;
+
+/**
+ * Utility class ot initialize {@link TaskExecutor} specific metrics.
+ */
+public class TaskExecutorMetricsInitializer {
+ private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorMetricsInitializer.class);
+
+ public static void instantiateStatusMetrics(
+ MetricGroup taskManagerMetricGroup,
+ NetworkEnvironment network) {
+ MetricGroup status = taskManagerMetricGroup.addGroup("Status");
+
+ instantiateNetworkMetrics(status.addGroup("Network"), network);
+
+ MetricGroup jvm = status.addGroup("JVM");
+
+ instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"));
+ instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"));
+ instantiateMemoryMetrics(jvm.addGroup("Memory"));
+ instantiateThreadMetrics(jvm.addGroup("Threads"));
+ instantiateCPUMetrics(jvm.addGroup("CPU"));
+ }
+
+ private static void instantiateNetworkMetrics(
+ MetricGroup metrics,
+ final NetworkEnvironment network) {
+ metrics.<Long, Gauge<Long>>gauge("TotalMemorySegments", new Gauge<Long> () {
+ @Override
+ public Long getValue() {
+ return (long) network.getNetworkBufferPool().getTotalNumberOfMemorySegments();
+ }
+ });
+
+ metrics.<Long, Gauge<Long>>gauge("AvailableMemorySegments", new Gauge<Long> () {
+ @Override
+ public Long getValue() {
+ return (long) network.getNetworkBufferPool().getNumberOfAvailableMemorySegments();
+ }
+ });
+ }
+
+ private static void instantiateClassLoaderMetrics(MetricGroup metrics) {
+ final ClassLoadingMXBean mxBean = ManagementFactory.getClassLoadingMXBean();
+
+ metrics.<Long, Gauge<Long>>gauge("ClassesLoaded", new Gauge<Long> () {
+ @Override
+ public Long getValue() {
+ return mxBean.getTotalLoadedClassCount();
+ }
+ });
+
+ metrics.<Long, Gauge<Long>>gauge("ClassesUnloaded", new Gauge<Long> () {
+ @Override
+ public Long getValue() {
+ return mxBean.getUnloadedClassCount();
+ }
+ });
+ }
+
+ private static void instantiateGarbageCollectorMetrics(MetricGroup metrics) {
+ List<GarbageCollectorMXBean> garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans();
+
+ for (final GarbageCollectorMXBean garbageCollector: garbageCollectors) {
+ MetricGroup gcGroup = metrics.addGroup(garbageCollector.getName());
+
+ gcGroup.<Long, Gauge<Long>>gauge("Count", new Gauge<Long> () {
+ @Override
+ public Long getValue() {
+ return garbageCollector.getCollectionCount();
+ }
+ });
+
+ gcGroup.<Long, Gauge<Long>>gauge("Time", new Gauge<Long> () {
+ @Override
+ public Long getValue() {
+ return garbageCollector.getCollectionTime();
+ }
+ });
+ }
+ }
+
+ private static void instantiateMemoryMetrics(MetricGroup metrics) {
+ final MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
+
+ MetricGroup heap = metrics.addGroup("Heap");
+
+ heap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () {
+ @Override
+ public Long getValue() {
+ return mxBean.getHeapMemoryUsage().getUsed();
+ }
+ });
+ heap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () {
+ @Override
+ public Long getValue() {
+ return mxBean.getHeapMemoryUsage().getCommitted();
+ }
+ });
+ heap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () {
+ @Override
+ public Long getValue() {
+ return mxBean.getHeapMemoryUsage().getMax();
+ }
+ });
+
+ MetricGroup nonHeap = metrics.addGroup("NonHeap");
+
+ nonHeap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () {
+ @Override
+ public Long getValue() {
+ return mxBean.getNonHeapMemoryUsage().getUsed();
+ }
+ });
+ nonHeap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () {
+ @Override
+ public Long getValue() {
+ return mxBean.getNonHeapMemoryUsage().getCommitted();
+ }
+ });
+ nonHeap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () {
+ @Override
+ public Long getValue() {
+ return mxBean.getNonHeapMemoryUsage().getMax();
+ }
+ });
+
+ final MBeanServer con = ManagementFactory.getPlatformMBeanServer();
+
+ final String directBufferPoolName = "java.nio:type=BufferPool,name=direct";
+
+ try {
+ final ObjectName directObjectName = new ObjectName(directBufferPoolName);
+
+ MetricGroup direct = metrics.addGroup("Direct");
+
+ direct.<Long, Gauge<Long>>gauge("Count", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, "Count", -1L));
+ direct.<Long, Gauge<Long>>gauge("MemoryUsed", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, "MemoryUsed", -1L));
+ direct.<Long, Gauge<Long>>gauge("TotalCapacity", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, "TotalCapacity", -1L));
+ } catch (MalformedObjectNameException e) {
+ LOG.warn("Could not create object name {}.", directBufferPoolName, e);
+ }
+
+ final String mappedBufferPoolName = "java.nio:type=BufferPool,name=mapped";
+
+ try {
+ final ObjectName mappedObjectName = new ObjectName(mappedBufferPoolName);
+
+ MetricGroup mapped = metrics.addGroup("Mapped");
+
+ mapped.<Long, Gauge<Long>>gauge("Count", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, "Count", -1L));
+ mapped.<Long, Gauge<Long>>gauge("MemoryUsed", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, "MemoryUsed", -1L));
+ mapped.<Long, Gauge<Long>>gauge("TotalCapacity", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, "TotalCapacity", -1L));
+ } catch (MalformedObjectNameException e) {
+ LOG.warn("Could not create object name {}.", mappedBufferPoolName, e);
+ }
+ }
+
+ private static void instantiateThreadMetrics(MetricGroup metrics) {
+ final ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
+
+ metrics.<Integer, Gauge<Integer>>gauge("Count", new Gauge<Integer> () {
+ @Override
+ public Integer getValue() {
+ return mxBean.getThreadCount();
+ }
+ });
+ }
+
+ private static void instantiateCPUMetrics(MetricGroup metrics) {
+ try {
+ final OperatingSystemMXBean mxBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
+
+ metrics.<Double, Gauge<Double>>gauge("Load", new Gauge<Double> () {
+ @Override
+ public Double getValue() {
+ return mxBean.getProcessCpuLoad();
+ }
+ });
+ metrics.<Long, Gauge<Long>>gauge("Time", new Gauge<Long> () {
+ @Override
+ public Long getValue() {
+ return mxBean.getProcessCpuTime();
+ }
+ });
+ } catch (Exception e) {
+ LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
+ " - CPU load metrics will not be available.", e);
+ }
+ }
+
+ private static final class AttributeGauge<T> implements Gauge<T> {
+ private final MBeanServer server;
+ private final ObjectName objectName;
+ private final String attributeName;
+ private final T errorValue;
+
+ private AttributeGauge(MBeanServer server, ObjectName objectName, String attributeName, T errorValue) {
+ this.server = Preconditions.checkNotNull(server);
+ this.objectName = Preconditions.checkNotNull(objectName);
+ this.attributeName = Preconditions.checkNotNull(attributeName);
+ this.errorValue = errorValue;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T getValue() {
+ try {
+ return (T) server.getAttribute(objectName, attributeName);
+ } catch (MBeanException | AttributeNotFoundException | InstanceNotFoundException | ReflectionException e) {
+ LOG.warn("Could not read attribute {}.", attributeName, e);
+ return errorValue;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java
new file mode 100644
index 0000000..b3a0cbb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Implementation using {@link ActorGateway} to forward the messages.
+ */
+public class ActorGatewayTaskManagerActions implements TaskManagerActions {
+
+ private final ActorGateway actorGateway;
+
+ public ActorGatewayTaskManagerActions(ActorGateway actorGateway) {
+ this.actorGateway = Preconditions.checkNotNull(actorGateway);
+ }
+
+ @Override
+ public void notifyFinalState(ExecutionAttemptID executionAttemptID) {
+ actorGateway.tell(new TaskMessages.TaskInFinalState(executionAttemptID));
+ }
+
+ @Override
+ public void notifyFatalError(String message, Throwable cause) {
+ actorGateway.tell(new TaskManagerMessages.FatalError(message, cause));
+ }
+
+ @Override
+ public void failTask(ExecutionAttemptID executionAttemptID, Throwable cause) {
+ actorGateway.tell(new TaskMessages.FailTask(executionAttemptID, cause));
+ }
+
+ @Override
+ public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+ TaskMessages.UpdateTaskExecutionState actorMessage = new TaskMessages.UpdateTaskExecutionState(taskExecutionState);
+
+ actorGateway.tell(actorMessage);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerConnection.java
deleted file mode 100644
index cddac55..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerConnection.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.runtime.messages.TaskMessages;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Implementation using {@link ActorGateway} to forward the messages.
- */
-public class ActorGatewayTaskManagerConnection implements TaskManagerConnection {
-
- private final ActorGateway actorGateway;
-
- public ActorGatewayTaskManagerConnection(ActorGateway actorGateway) {
- this.actorGateway = Preconditions.checkNotNull(actorGateway);
- }
-
- @Override
- public void notifyFinalState(ExecutionAttemptID executionAttemptID) {
- actorGateway.tell(new TaskMessages.TaskInFinalState(executionAttemptID));
- }
-
- @Override
- public void notifyFatalError(String message, Throwable cause) {
- actorGateway.tell(new TaskManagerMessages.FatalError(message, cause));
- }
-
- @Override
- public void failTask(ExecutionAttemptID executionAttemptID, Throwable cause) {
- actorGateway.tell(new TaskMessages.FailTask(executionAttemptID, cause));
- }
-
- @Override
- public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
- TaskMessages.UpdateTaskExecutionState actorMessage = new TaskMessages.UpdateTaskExecutionState(taskExecutionState);
-
- actorGateway.tell(actorMessage);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index bd522bd..37ac0a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -126,6 +127,9 @@ public class Task implements Runnable, TaskActions {
/** The execution attempt of the parallel subtask */
private final ExecutionAttemptID executionId;
+ /** ID which identifies the slot in which the task is supposed to run */
+ private final AllocationID allocationID;
+
/** TaskInfo object for this task */
private final TaskInfo taskInfo;
@@ -171,7 +175,7 @@ public class Task implements Runnable, TaskActions {
private final Map<IntermediateDataSetID, SingleInputGate> inputGatesById;
/** Connection to the task manager */
- private final TaskManagerConnection taskManagerConnection;
+ private final TaskManagerActions taskManagerActions;
/** Input split provider for the task */
private final InputSplitProvider inputSplitProvider;
@@ -246,7 +250,7 @@ public class Task implements Runnable, TaskActions {
IOManager ioManager,
NetworkEnvironment networkEnvironment,
BroadcastVariableManager bcVarManager,
- TaskManagerConnection taskManagerConnection,
+ TaskManagerActions taskManagerActions,
InputSplitProvider inputSplitProvider,
CheckpointResponder checkpointResponder,
LibraryCacheManager libraryCache,
@@ -261,6 +265,7 @@ public class Task implements Runnable, TaskActions {
this.jobId = checkNotNull(tdd.getJobID());
this.vertexId = checkNotNull(tdd.getVertexID());
this.executionId = checkNotNull(tdd.getExecutionId());
+ this.allocationID = checkNotNull(tdd.getAllocationID());
this.taskNameWithSubtask = taskInfo.getTaskNameWithSubtasks();
this.jobConfiguration = checkNotNull(tdd.getJobConfiguration());
this.taskConfiguration = checkNotNull(tdd.getTaskConfiguration());
@@ -281,7 +286,7 @@ public class Task implements Runnable, TaskActions {
this.inputSplitProvider = checkNotNull(inputSplitProvider);
this.checkpointResponder = checkNotNull(checkpointResponder);
- this.taskManagerConnection = checkNotNull(taskManagerConnection);
+ this.taskManagerActions = checkNotNull(taskManagerActions);
this.libraryCache = checkNotNull(libraryCache);
this.fileCache = checkNotNull(fileCache);
@@ -365,6 +370,10 @@ public class Task implements Runnable, TaskActions {
return executionId;
}
+ public AllocationID getAllocationID() {
+ return allocationID;
+ }
+
public TaskInfo getTaskInfo() {
return taskInfo;
}
@@ -584,7 +593,7 @@ public class Task implements Runnable, TaskActions {
// notify everyone that we switched to running
notifyObservers(ExecutionState.RUNNING, null);
- taskManagerConnection.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
+ taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
// make sure the user code classloader is accessible thread-locally
executingThread.setContextClassLoader(userCodeClassLoader);
@@ -777,11 +786,11 @@ public class Task implements Runnable, TaskActions {
}
private void notifyFinalState() {
- taskManagerConnection.notifyFinalState(executionId);
+ taskManagerActions.notifyFinalState(executionId);
}
private void notifyFatalError(String message, Throwable cause) {
- taskManagerConnection.notifyFatalError(message, cause);
+ taskManagerActions.notifyFatalError(message, cause);
}
// ----------------------------------------------------------------------------------------------------------------
@@ -807,7 +816,7 @@ public class Task implements Runnable, TaskActions {
((StoppableTask)Task.this.invokable).stop();
} catch(RuntimeException e) {
LOG.error("Stopping task " + taskNameWithSubtask + " failed.", e);
- taskManagerConnection.failTask(executionId, e);
+ taskManagerActions.failTask(executionId, e);
}
}
};
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
index 60aadf5..877cc1e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
@@ -24,6 +24,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.util.SerializedThrowable;
+import java.io.Serializable;
+
/**
* This class represents an update about a task's execution state.
*
@@ -34,7 +36,7 @@ import org.apache.flink.runtime.util.SerializedThrowable;
* exception field transient and deserialized it lazily, with the
* appropriate class loader.
*/
-public class TaskExecutionState implements java.io.Serializable {
+public class TaskExecutionState implements Serializable {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
index 60beae0..31c518a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.util.InstantiationUtil;
@@ -63,35 +64,45 @@ public class TaskInputSplitProvider implements InputSplitProvider {
}
@Override
- public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) {
+ public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {
Preconditions.checkNotNull(userCodeClassLoader);
+ final Future<Object> response = jobManager.ask(
+ new JobManagerMessages.RequestNextInputSplit(jobID, vertexID, executionID),
+ timeout);
+
+ final Object result;
+
try {
- final Future<Object> response = jobManager.ask(
- new JobManagerMessages.RequestNextInputSplit(jobID, vertexID, executionID),
- timeout);
+ result = Await.result(response, timeout);
+ } catch (Exception e) {
+ throw new InputSplitProviderException("Did not receive next input split from JobManager.", e);
+ }
- final Object result = Await.result(response, timeout);
+ if(result instanceof JobManagerMessages.NextInputSplit){
+ final JobManagerMessages.NextInputSplit nextInputSplit =
+ (JobManagerMessages.NextInputSplit) result;
- if(result instanceof JobManagerMessages.NextInputSplit){
- final JobManagerMessages.NextInputSplit nextInputSplit =
- (JobManagerMessages.NextInputSplit) result;
+ byte[] serializedData = nextInputSplit.splitData();
- byte[] serializedData = nextInputSplit.splitData();
+ if(serializedData == null) {
+ return null;
+ } else {
+ final Object deserialized;
- if(serializedData == null) {
- return null;
- } else {
- Object deserialized = InstantiationUtil.deserializeObject(serializedData,
+ try {
+ deserialized = InstantiationUtil.deserializeObject(serializedData,
userCodeClassLoader);
- return (InputSplit) deserialized;
+ } catch (Exception e) {
+ throw new InputSplitProviderException("Could not deserialize the serialized input split.", e);
}
- } else {
- throw new Exception("RequestNextInputSplit requires a response of type " +
- "NextInputSplit. Instead response is of type " + result.getClass() + '.');
+
+ return (InputSplit) deserialized;
}
- } catch (Exception e) {
- throw new RuntimeException("Requesting the next InputSplit failed.", e);
+ } else {
+ throw new InputSplitProviderException("RequestNextInputSplit requires a response of type " +
+ "NextInputSplit. Instead response is of type " + result.getClass() + '.');
}
+
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java
new file mode 100644
index 0000000..2f3a0cb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * Interface for the communication of the {@link Task} with the {@link TaskManager}.
+ */
+public interface TaskManagerActions {
+
+ /**
+ * Notifies the task manager that the given task is in a final state.
+ *
+ * @param executionAttemptID Execution attempt ID of the task
+ */
+ void notifyFinalState(ExecutionAttemptID executionAttemptID);
+
+ /**
+ * Notifies the task manager about a fatal error occurred in the task.
+ *
+ * @param message Message to report
+ * @param cause Cause of the fatal error
+ */
+ void notifyFatalError(String message, Throwable cause);
+
+ /**
+ * Tells the task manager to fail the given task.
+ *
+ * @param executionAttemptID Execution attempt ID of the task to fail
+ * @param cause Cause of the failure
+ */
+ void failTask(ExecutionAttemptID executionAttemptID, Throwable cause);
+
+ /**
+ * Notifies the task manager about the task execution state update.
+ *
+ * @param taskExecutionState Task execution state update
+ */
+ void updateTaskExecutionState(TaskExecutionState taskExecutionState);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerConnection.java
deleted file mode 100644
index dc1b40f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerConnection.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-
-/**
- * Interface for the communication of the {@link Task} with the {@link TaskManager}.
- */
-public interface TaskManagerConnection {
-
- /**
- * Notifies the task manager that the given task is in a final state.
- *
- * @param executionAttemptID Execution attempt ID of the task
- */
- void notifyFinalState(ExecutionAttemptID executionAttemptID);
-
- /**
- * Notifies the task manager about a fatal error occurred in the task.
- *
- * @param message Message to report
- * @param cause Cause of the fatal error
- */
- void notifyFatalError(String message, Throwable cause);
-
- /**
- * Tells the task manager to fail the given task.
- *
- * @param executionAttemptID Execution attempt ID of the task to fail
- * @param cause Cause of the failure
- */
- void failTask(ExecutionAttemptID executionAttemptID, Throwable cause);
-
- /**
- * Notifies the task manager about the task execution state update.
- *
- * @param taskExecutionState Task execution state update
- */
- void updateTaskExecutionState(TaskExecutionState taskExecutionState);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7501da7..7364ee0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -25,7 +25,6 @@ import java.net.{InetAddress, InetSocketAddress}
import java.util
import java.util.UUID
import java.util.concurrent.TimeUnit
-import javax.management.ObjectName
import _root_.akka.actor._
import _root_.akka.pattern.ask
@@ -38,7 +37,6 @@ import grizzled.slf4j.Logger
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.flink.configuration._
import org.apache.flink.core.fs.FileSystem
-import org.apache.flink.metrics.{MetricGroup, Gauge => FlinkGauge}
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.clusterframework.messages.StopCluster
import org.apache.flink.runtime.clusterframework.types.ResourceID
@@ -60,7 +58,7 @@ import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, Leader
import org.apache.flink.runtime.memory.MemoryManager
import org.apache.flink.runtime.messages.Messages._
import org.apache.flink.runtime.messages.RegistrationMessages._
-import org.apache.flink.runtime.messages.StackTraceSampleMessages.{ResponseStackTraceSampleFailure, ResponseStackTraceSampleSuccess, SampleTaskStackTrace, StackTraceSampleMessages, TriggerStackTraceSample}
+import org.apache.flink.runtime.messages.StackTraceSampleMessages._
import org.apache.flink.runtime.messages.TaskManagerMessages._
import org.apache.flink.runtime.messages.TaskMessages._
import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete, TriggerCheckpoint}
@@ -69,7 +67,8 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
import org.apache.flink.runtime.process.ProcessReaper
import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, SecurityConfiguration}
import org.apache.flink.runtime.security.SecurityContext
-import org.apache.flink.runtime.taskexecutor.{TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
+import org.apache.flink.runtime.taskexecutor._
+import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer
import org.apache.flink.runtime.util._
import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
import org.apache.flink.util.NetUtils
@@ -151,7 +150,7 @@ class TaskManager(
protected val bcVarManager = new BroadcastVariableManager()
/** Handler for distributed files cached by this TaskManager */
- protected val fileCache = new FileCache(config.getConfiguration())
+ protected val fileCache = new FileCache(config.getTmpDirPaths())
/** Registry of metrics periodically transmitted to the JobManager */
private val metricRegistry = TaskManager.createMetricsRegistry()
@@ -197,7 +196,7 @@ class TaskManager(
CheckpointResponder,
PartitionStateChecker,
ResultPartitionConsumableNotifier,
- TaskManagerConnection)] = None
+ TaskManagerActions)] = None
// --------------------------------------------------------------------------
// Actor messages and life cycle
@@ -941,9 +940,9 @@ class TaskManager(
val jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID.orNull)
val taskManagerGateway = new AkkaActorGateway(self, leaderSessionID.orNull)
- val checkpointResponder = new ActorGatewayCheckpointResponder(jobManagerGateway);
+ val checkpointResponder = new ActorGatewayCheckpointResponder(jobManagerGateway)
- val taskManagerConnection = new ActorGatewayTaskManagerConnection(taskManagerGateway)
+ val taskManagerConnection = new ActorGatewayTaskManagerActions(taskManagerGateway)
val partitionStateChecker = new ActorGatewayPartitionStateChecker(
jobManagerGateway,
@@ -999,7 +998,7 @@ class TaskManager(
taskManagerMetricGroup =
new TaskManagerMetricGroup(metricsRegistry, this.runtimeInfo.getHostname, id.toString)
- TaskManager.instantiateStatusMetrics(taskManagerMetricGroup, network)
+ TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, network)
// watch job manager to detect when it dies
context.watch(jobManager)
@@ -2017,23 +2016,23 @@ object TaskManager {
// Pre-processing steps for registering cpuLoad
val osBean: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean()
-
- val fetchCPULoadMethod: Option[Method] =
+
+ val fetchCPULoadMethod: Option[Method] =
try {
Class.forName("com.sun.management.OperatingSystemMXBean")
.getMethods()
- .find( _.getName() == "getProcessCpuLoad" )
+ .find(_.getName() == "getProcessCpuLoad")
}
catch {
case t: Throwable =>
LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
- " - CPU load metrics will not be available.")
+ " - CPU load metrics will not be available.")
None
}
metricRegistry.register("cpuLoad", new Gauge[Double] {
override def getValue: Double = {
- try{
+ try {
fetchCPULoadMethod.map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0)
}
catch {
@@ -2045,146 +2044,4 @@ object TaskManager {
})
metricRegistry
}
-
- private def instantiateStatusMetrics(
- taskManagerMetricGroup: MetricGroup,
- network: NetworkEnvironment)
- : Unit = {
- val status = taskManagerMetricGroup
- .addGroup("Status")
-
- instantiateNetworkMetrics(status.addGroup("Network"), network)
-
- val jvm = status
- .addGroup("JVM")
-
- instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"))
- instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"))
- instantiateMemoryMetrics(jvm.addGroup("Memory"))
- instantiateThreadMetrics(jvm.addGroup("Threads"))
- instantiateCPUMetrics(jvm.addGroup("CPU"))
- }
-
- private def instantiateNetworkMetrics(
- metrics: MetricGroup,
- network: NetworkEnvironment)
- : Unit = {
- metrics.gauge[Long, FlinkGauge[Long]]("TotalMemorySegments", new FlinkGauge[Long] {
- override def getValue: Long = network.getNetworkBufferPool.getTotalNumberOfMemorySegments
- })
- metrics.gauge[Long, FlinkGauge[Long]]("AvailableMemorySegments", new FlinkGauge[Long] {
- override def getValue: Long = network.getNetworkBufferPool.getNumberOfAvailableMemorySegments
- })
- }
-
- private def instantiateClassLoaderMetrics(metrics: MetricGroup) {
- val mxBean = ManagementFactory.getClassLoadingMXBean
-
- metrics.gauge[Long, FlinkGauge[Long]]("ClassesLoaded", new FlinkGauge[Long] {
- override def getValue: Long = mxBean.getTotalLoadedClassCount
- })
- metrics.gauge[Long, FlinkGauge[Long]]("ClassesUnloaded", new FlinkGauge[Long] {
- override def getValue: Long = mxBean.getUnloadedClassCount
- })
- }
-
- private def instantiateGarbageCollectorMetrics(metrics: MetricGroup) {
- val garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans
-
- for (garbageCollector <- garbageCollectors.asScala) {
- val gcGroup = metrics.addGroup(garbageCollector.getName)
- gcGroup.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
- override def getValue: Long = garbageCollector.getCollectionCount
- })
- gcGroup.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] {
- override def getValue: Long = garbageCollector.getCollectionTime
- })
- }
- }
-
- private def instantiateMemoryMetrics(metrics: MetricGroup) {
- val mxBean = ManagementFactory.getMemoryMXBean
- val heap = metrics.addGroup("Heap")
- heap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] {
- override def getValue: Long = mxBean.getHeapMemoryUsage.getUsed
- })
- heap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] {
- override def getValue: Long = mxBean.getHeapMemoryUsage.getCommitted
- })
- heap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] {
- override def getValue: Long = mxBean.getHeapMemoryUsage.getMax
- })
-
- val nonHeap = metrics.addGroup("NonHeap")
- nonHeap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] {
- override def getValue: Long = mxBean.getNonHeapMemoryUsage.getUsed
- })
- nonHeap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] {
- override def getValue: Long = mxBean.getNonHeapMemoryUsage.getCommitted
- })
- nonHeap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] {
- override def getValue: Long = mxBean.getNonHeapMemoryUsage.getMax
- })
-
- val con = ManagementFactory.getPlatformMBeanServer;
-
- val directObjectName = new ObjectName("java.nio:type=BufferPool,name=direct")
-
- val direct = metrics.addGroup("Direct")
- direct.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
- override def getValue: Long = con
- .getAttribute(directObjectName, "Count").asInstanceOf[Long]
- })
- direct.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] {
- override def getValue: Long = con
- .getAttribute(directObjectName, "MemoryUsed").asInstanceOf[Long]
- })
- direct.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] {
- override def getValue: Long = con
- .getAttribute(directObjectName, "TotalCapacity").asInstanceOf[Long]
- })
-
- val mappedObjectName = new ObjectName("java.nio:type=BufferPool,name=mapped")
-
- val mapped = metrics.addGroup("Mapped")
- mapped.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
- override def getValue: Long = con
- .getAttribute(mappedObjectName, "Count").asInstanceOf[Long]
- })
- mapped.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] {
- override def getValue: Long = con
- .getAttribute(mappedObjectName, "MemoryUsed").asInstanceOf[Long]
- })
- mapped.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] {
- override def getValue: Long = con
- .getAttribute(mappedObjectName, "TotalCapacity").asInstanceOf[Long]
- })
- }
-
- private def instantiateThreadMetrics(metrics: MetricGroup): Unit = {
- val mxBean = ManagementFactory.getThreadMXBean
-
- metrics.gauge[Int, FlinkGauge[Int]]("Count", new FlinkGauge[Int] {
- override def getValue: Int = mxBean.getThreadCount
- })
- }
-
- private def instantiateCPUMetrics(metrics: MetricGroup): Unit = {
- try {
- val mxBean = ManagementFactory.getOperatingSystemMXBean
- .asInstanceOf[com.sun.management.OperatingSystemMXBean]
-
- metrics.gauge[Double, FlinkGauge[Double]]("Load", new FlinkGauge[Double] {
- override def getValue: Double = mxBean.getProcessCpuLoad
- })
- metrics.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] {
- override def getValue: Long = mxBean.getProcessCpuTime
- })
- }
- catch {
- case t: Throwable =>
- LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
- " - CPU load metrics will not be available.")
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
index c369674..4db0d93 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.filecache;
import java.io.File;
import java.util.concurrent.Future;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
import org.apache.flink.api.common.JobID;
@@ -62,8 +61,9 @@ public class FileCacheDeleteValidationTest {
@Before
public void setup() {
+ String[] tmpDirectories = System.getProperty("java.io.tmpdir").split(",|" + File.pathSeparator);
try {
- fileCache = new FileCache(new Configuration());
+ fileCache = new FileCache(tmpDirectories);
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index f8a0b6a..30dfef5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -42,7 +42,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 7710fa9..f5fe52c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -20,8 +20,11 @@ package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.highavailability.NonHaServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -29,6 +32,7 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
@@ -38,7 +42,6 @@ import org.apache.flink.runtime.rpc.TestingSerialRpcService;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLogger;
-import org.hamcrest.Matchers;
import org.junit.Test;
import org.powermock.api.mockito.PowerMockito;
@@ -60,10 +63,14 @@ public class TaskExecutorTest extends TestLogger {
ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+ PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration());
+ PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new String[1]);
+
rpc.registerGateway(resourceManagerAddress, rmGateway);
TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
+ when(taskManagerLocation.getHostname()).thenReturn("foobar");
NonHaServices haServices = new NonHaServices(resourceManagerAddress);
@@ -76,6 +83,9 @@ public class TaskExecutorTest extends TestLogger {
mock(NetworkEnvironment.class),
haServices,
mock(MetricRegistry.class),
+ mock(TaskManagerMetricGroup.class),
+ mock(BroadcastVariableManager.class),
+ mock(FileCache.class),
mock(FatalErrorHandler.class));
taskManager.start();
@@ -113,9 +123,12 @@ public class TaskExecutorTest extends TestLogger {
TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+ PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration());
+ PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new String[1]);
TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
+ when(taskManagerLocation.getHostname()).thenReturn("foobar");
TaskExecutor taskManager = new TaskExecutor(
taskManagerServicesConfiguration,
@@ -126,6 +139,9 @@ public class TaskExecutorTest extends TestLogger {
mock(NetworkEnvironment.class),
haServices,
mock(MetricRegistry.class),
+ mock(TaskManagerMetricGroup.class),
+ mock(BroadcastVariableManager.class),
+ mock(FileCache.class),
mock(FatalErrorHandler.class));
taskManager.start();
@@ -182,9 +198,12 @@ public class TaskExecutorTest extends TestLogger {
TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+ PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration());
+ PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new String[1]);
TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
+ when(taskManagerLocation.getHostname()).thenReturn("foobar");
TaskExecutor taskManager = new TaskExecutor(
taskManagerServicesConfiguration,
@@ -195,6 +214,9 @@ public class TaskExecutorTest extends TestLogger {
mock(NetworkEnvironment.class),
haServices,
mock(MetricRegistry.class),
+ mock(TaskManagerMetricGroup.class),
+ mock(BroadcastVariableManager.class),
+ mock(FileCache.class),
mock(FatalErrorHandler.class));
taskManager.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 7dd67ed..3a87d86 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -174,7 +174,7 @@ public class TaskAsyncCallTest {
mock(IOManager.class),
networkEnvironment,
mock(BroadcastVariableManager.class),
- mock(TaskManagerConnection.class),
+ mock(TaskManagerActions.class),
mock(InputSplitProvider.class),
mock(CheckpointResponder.class),
libCache,
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java
index 642300d..777633d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.instance.BaseTestingActorGateway;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Test;
@@ -36,7 +37,7 @@ import java.util.concurrent.TimeUnit;
public class TaskInputSplitProviderTest {
@Test
- public void testRequestNextInputSplitWithInvalidExecutionID() {
+ public void testRequestNextInputSplitWithInvalidExecutionID() throws InputSplitProviderException {
final JobID jobID = new JobID();
final JobVertexID vertexID = new JobVertexID();
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index 9791cee..5d3eb3a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -68,6 +69,7 @@ public class TaskStopTest {
when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class));
when(tddMock.getSerializedExecutionConfig()).thenReturn(mock(SerializedValue.class));
when(tddMock.getInvokableClassName()).thenReturn("className");
+ when(tddMock.getAllocationID()).thenReturn(mock(AllocationID.class));
task = new Task(
tddMock,
@@ -75,7 +77,7 @@ public class TaskStopTest {
mock(IOManager.class),
mock(NetworkEnvironment.class),
mock(BroadcastVariableManager.class),
- mock(TaskManagerConnection.class),
+ mock(TaskManagerActions.class),
mock(InputSplitProvider.class),
mock(CheckpointResponder.class),
mock(LibraryCacheManager.class),
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 9a13cde..fe618ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -97,7 +97,7 @@ public class TaskTest {
private ActorGateway listenerGateway;
private ActorGatewayTaskExecutionStateListener listener;
- private ActorGatewayTaskManagerConnection taskManagerConnection;
+ private ActorGatewayTaskManagerActions taskManagerConnection;
private BlockingQueue<Object> taskManagerMessages;
private BlockingQueue<Object> jobManagerMessages;
@@ -113,7 +113,7 @@ public class TaskTest {
listenerGateway = new ForwardingActorGateway(listenerMessages);
listener = new ActorGatewayTaskExecutionStateListener(listenerGateway);
- taskManagerConnection = new ActorGatewayTaskManagerConnection(taskManagerGateway);
+ taskManagerConnection = new ActorGatewayTaskManagerActions(taskManagerGateway);
awaitLatch = new OneShotLatch();
triggerLatch = new OneShotLatch();
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
index 343affe..c067ca7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import java.util.Iterator;
@@ -146,7 +147,12 @@ public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<O
return true;
}
- InputSplit split = provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader());
+ final InputSplit split;
+ try {
+ split = provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader());
+ } catch (InputSplitProviderException e) {
+ throw new RuntimeException("Could not retrieve next input split.", e);
+ }
if (split != null) {
this.nextSplit = split;
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index ee5a203..861665f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -48,7 +48,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.taskmanager.TaskManagerConnection;
+import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -60,6 +60,7 @@ import org.apache.flink.util.SerializedValue;
import org.junit.Test;
import java.io.EOFException;
+import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
@@ -161,17 +162,19 @@ public class InterruptSensitiveRestoreTest {
when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
.thenReturn(mock(TaskKvStateRegistry.class));
+ String[] tmpDirectories = EnvironmentInformation.getTemporaryFileDirectory().split(",|" + File.pathSeparator);
+
return new Task(
tdd,
mock(MemoryManager.class),
mock(IOManager.class),
networkEnvironment,
mock(BroadcastVariableManager.class),
- mock(TaskManagerConnection.class),
+ mock(TaskManagerActions.class),
mock(InputSplitProvider.class),
mock(CheckpointResponder.class),
new FallbackLibraryCacheManager(),
- new FileCache(new Configuration()),
+ new FileCache(tmpDirectories),
new TaskManagerRuntimeInfo(
"localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
new UnregisteredTaskMetricsGroup(),
@@ -274,4 +277,4 @@ public class InterruptSensitiveRestoreTest {
fail("should never be called");
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 94f6d5a..ee0839f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -51,7 +51,7 @@ import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskExecutionStateListener;
-import org.apache.flink.runtime.taskmanager.TaskManagerConnection;
+import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -238,7 +238,7 @@ public class StreamTaskTest {
mock(IOManager.class),
network,
mock(BroadcastVariableManager.class),
- mock(TaskManagerConnection.class),
+ mock(TaskManagerActions.class),
mock(InputSplitProvider.class),
mock(CheckpointResponder.class),
libCache,