You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/11 17:45:30 UTC
[1/4] flink git commit: [FLINK-7017] Remove netty usages in
flink-tests
Repository: flink
Updated Branches:
refs/heads/master 211d0963e -> 021d27d54
[FLINK-7017] Remove netty usages in flink-tests
This closes #4196.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed3b3266
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed3b3266
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed3b3266
Branch: refs/heads/master
Commit: ed3b3266cdb4811ffb3539640c57354ae71adc24
Parents: 748eba1
Author: zentol <ch...@apache.org>
Authored: Tue Jun 27 15:08:50 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 11 18:19:25 2017 +0200
----------------------------------------------------------------------
flink-runtime-web/pom.xml | 7 +
.../runtime/webmonitor/WebFrontendITCase.java | 290 ++++++++++++++++++
.../test/checkpointing/RescalingITCase.java | 3 +-
.../flink/test/web/WebFrontendITCase.java | 300 -------------------
4 files changed, 299 insertions(+), 301 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ed3b3266/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index 040e844..988cd76 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -83,6 +83,13 @@ under the License.
</dependency>
<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>${curator.version}</version>
http://git-wip-us.apache.org/repos/asf/flink/blob/ed3b3266/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
new file mode 100644
index 0000000..711bbdb
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.testutils.StoppableInvokable;
+import org.apache.flink.runtime.webmonitor.files.MimeTypes;
+import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.commons.io.FileUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the WebFrontend.
+ */
+public class WebFrontendITCase extends TestLogger {
+
+ private static final int NUM_TASK_MANAGERS = 2;
+ private static final int NUM_SLOTS = 4;
+
+ private static LocalFlinkMiniCluster cluster;
+
+ private static int port = -1;
+
+ @BeforeClass
+ public static void initialize() throws Exception {
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS);
+ config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
+ config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
+
+ File logDir = File.createTempFile("TestBaseUtils-logdir", null);
+ assertTrue("Unable to delete temp file", logDir.delete());
+ assertTrue("Unable to create temp directory", logDir.mkdir());
+ File logFile = new File(logDir, "jobmanager.log");
+ File outFile = new File(logDir, "jobmanager.out");
+
+ Files.createFile(logFile.toPath());
+ Files.createFile(outFile.toPath());
+
+ config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.getAbsolutePath());
+ config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
+
+ cluster = new LocalFlinkMiniCluster(config, false);
+ cluster.start();
+
+ port = cluster.webMonitor().get().getServerPort();
+ }
+
+ @Test
+ public void getFrontPage() {
+ try {
+ String fromHTTP = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/index.html");
+ String text = "Apache Flink Dashboard";
+ assertTrue("Startpage should contain " + text, fromHTTP.contains(text));
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void getNumberOfTaskManagers() {
+ try {
+ String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode response = mapper.readTree(json);
+ ArrayNode taskManagers = (ArrayNode) response.get("taskmanagers");
+
+ assertNotNull(taskManagers);
+ assertEquals(cluster.numTaskManagers(), taskManagers.size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void getTaskmanagers() {
+ try {
+ String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode parsed = mapper.readTree(json);
+ ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
+
+ assertNotNull(taskManagers);
+ assertEquals(cluster.numTaskManagers(), taskManagers.size());
+
+ JsonNode taskManager = taskManagers.get(0);
+ assertNotNull(taskManager);
+ assertEquals(NUM_SLOTS, taskManager.get("slotsNumber").asInt());
+ assertTrue(taskManager.get("freeSlots").asInt() <= NUM_SLOTS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void getLogAndStdoutFiles() throws Exception {
+ WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration());
+
+ FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
+ String logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/log");
+ assertTrue(logs.contains("job manager log"));
+
+ FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
+ logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/stdout");
+ assertTrue(logs.contains("job manager out"));
+ }
+
+ @Test
+ public void getTaskManagerLogAndStdoutFiles() {
+ try {
+ String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode parsed = mapper.readTree(json);
+ ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
+ JsonNode taskManager = taskManagers.get(0);
+ String id = taskManager.get("id").asText();
+
+ WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration());
+
+ //we check for job manager log files, since no separate taskmanager logs exist
+ FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
+ String logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/log");
+ assertTrue(logs.contains("job manager log"));
+
+ FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
+ logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/stdout");
+ assertTrue(logs.contains("job manager out"));
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void getConfiguration() {
+ try {
+ String config = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/config");
+
+ Map<String, String> conf = WebMonitorUtils.fromKeyValueJsonArray(config);
+ assertEquals(
+ cluster.configuration().getString("taskmanager.numberOfTaskSlots", null),
+ conf.get(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS));
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testStop() throws Exception {
+ // this only works if there is no active job at this point
+ assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty());
+
+ // Create a task
+ final JobVertex sender = new JobVertex("Sender");
+ sender.setParallelism(2);
+ sender.setInvokableClass(StoppableInvokable.class);
+
+ final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
+ final JobID jid = jobGraph.getJobID();
+
+ cluster.submitJobDetached(jobGraph);
+
+ // wait for job to show up
+ while (cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+ Thread.sleep(10);
+ }
+
+ final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
+ final Deadline deadline = testTimeout.fromNow();
+
+ while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+ try (HttpTestClient client = new HttpTestClient("localhost", port)) {
+ // Request the file from the web server
+ client.sendDeleteRequest("/jobs/" + jid + "/stop", deadline.timeLeft());
+ HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
+
+ assertEquals(HttpResponseStatus.OK, response.getStatus());
+ assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json"));
+ assertEquals("{}", response.getContent());
+ }
+
+ Thread.sleep(20);
+ }
+
+ // ensure we can access job details when its finished (FLINK-4011)
+ try (HttpTestClient client = new HttpTestClient("localhost", port)) {
+ FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
+ client.sendGetRequest("/jobs/" + jid + "/config", timeout);
+ HttpTestClient.SimpleHttpResponse response = client.getNextResponse(timeout);
+
+ assertEquals(HttpResponseStatus.OK, response.getStatus());
+ assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json"));
+ assertEquals("{\"jid\":\"" + jid + "\",\"name\":\"Stoppable streaming test job\"," +
+ "\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"default\"," +
+ "\"job-parallelism\":-1,\"object-reuse-mode\":false,\"user-config\":{}}}", response.getContent());
+ }
+ }
+
+ @Test
+ public void testStopYarn() throws Exception {
+ // this only works if there is no active job at this point
+ assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty());
+
+ // Create a task
+ final JobVertex sender = new JobVertex("Sender");
+ sender.setParallelism(2);
+ sender.setInvokableClass(StoppableInvokable.class);
+
+ final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
+ final JobID jid = jobGraph.getJobID();
+
+ cluster.submitJobDetached(jobGraph);
+
+ // wait for job to show up
+ while (cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+ Thread.sleep(10);
+ }
+
+ final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
+ final Deadline deadline = testTimeout.fromNow();
+
+ while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+ try (HttpTestClient client = new HttpTestClient("localhost", port)) {
+ // Request the file from the web server
+ client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft());
+
+ HttpTestClient.SimpleHttpResponse response = client
+ .getNextResponse(deadline.timeLeft());
+
+ assertEquals(HttpResponseStatus.OK, response.getStatus());
+ assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json"));
+ assertEquals("{}", response.getContent());
+ }
+
+ Thread.sleep(20);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed3b3266/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index e934c27..264b22e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -70,6 +70,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -866,7 +867,7 @@ public class RescalingITCase extends TestLogger {
private static class CollectionSink<IN> implements SinkFunction<IN> {
- private static ConcurrentSet<Object> elements = new ConcurrentSet<Object>();
+ private static Set<Object> elements = Collections.newSetFromMap(new ConcurrentHashMap<Object, Boolean>());
private static final long serialVersionUID = -1652452958040267745L;
http://git-wip-us.apache.org/repos/asf/flink/blob/ed3b3266/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
deleted file mode 100644
index 538ac98..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.web;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-
-import org.apache.commons.io.FileUtils;
-
-import org.apache.commons.math3.optim.nonlinear.vector.JacobianMultivariateVectorOptimizer;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.JobManagerCliOptions;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.runtime.testutils.StoppableInvokable;
-import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.runtime.webmonitor.files.MimeTypes;
-import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
-import org.apache.flink.test.util.TestBaseUtils;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import io.netty.handler.codec.http.HttpResponseStatus;
-
-import java.io.File;
-import java.nio.file.Files;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import static org.apache.flink.test.util.TestBaseUtils.getFromHTTP;
-
-public class WebFrontendITCase extends TestLogger {
-
- private static final int NUM_TASK_MANAGERS = 2;
- private static final int NUM_SLOTS = 4;
-
- private static LocalFlinkMiniCluster cluster;
-
- private static int port = -1;
-
- @BeforeClass
- public static void initialize() throws Exception {
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS);
- config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
- config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
-
- File logDir = File.createTempFile("TestBaseUtils-logdir", null);
- assertTrue("Unable to delete temp file", logDir.delete());
- assertTrue("Unable to create temp directory", logDir.mkdir());
- File logFile = new File(logDir, "jobmanager.log");
- File outFile = new File(logDir, "jobmanager.out");
-
- Files.createFile(logFile.toPath());
- Files.createFile(outFile.toPath());
-
- config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.getAbsolutePath());
- config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
-
- cluster = new LocalFlinkMiniCluster(config, false);
- cluster.start();
-
- port = cluster.webMonitor().get().getServerPort();
- }
-
- @Test
- public void getFrontPage() {
- try {
- String fromHTTP = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/index.html");
- String text = "Apache Flink Dashboard";
- assertTrue("Startpage should contain " + text, fromHTTP.contains(text));
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void getNumberOfTaskManagers() {
- try {
- String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
-
- ObjectMapper mapper = new ObjectMapper();
- JsonNode response = mapper.readTree(json);
- ArrayNode taskManagers = (ArrayNode) response.get("taskmanagers");
-
- assertNotNull(taskManagers);
- assertEquals(cluster.numTaskManagers(), taskManagers.size());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void getTaskmanagers() {
- try {
- String json = getFromHTTP("http://localhost:" + port + "/taskmanagers/");
-
- ObjectMapper mapper = new ObjectMapper();
- JsonNode parsed = mapper.readTree(json);
- ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
-
- assertNotNull(taskManagers);
- assertEquals(cluster.numTaskManagers(), taskManagers.size());
-
- JsonNode taskManager = taskManagers.get(0);
- assertNotNull(taskManager);
- assertEquals(NUM_SLOTS, taskManager.get("slotsNumber").asInt());
- assertTrue(taskManager.get("freeSlots").asInt() <= NUM_SLOTS);
- }
- catch(Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void getLogAndStdoutFiles() throws Exception {
- WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration());
-
- FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
- String logs = getFromHTTP("http://localhost:" + port + "/jobmanager/log");
- assertTrue(logs.contains("job manager log"));
-
- FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
- logs = getFromHTTP("http://localhost:" + port + "/jobmanager/stdout");
- assertTrue(logs.contains("job manager out"));
- }
-
- @Test
- public void getTaskManagerLogAndStdoutFiles() {
- try {
- String json = getFromHTTP("http://localhost:" + port + "/taskmanagers/");
-
- ObjectMapper mapper = new ObjectMapper();
- JsonNode parsed = mapper.readTree(json);
- ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
- JsonNode taskManager = taskManagers.get(0);
- String id = taskManager.get("id").asText();
-
- WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration());
-
- //we check for job manager log files, since no separate taskmanager logs exist
- FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
- String logs = getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/log");
- assertTrue(logs.contains("job manager log"));
-
- FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
- logs = getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/stdout");
- assertTrue(logs.contains("job manager out"));
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void getConfiguration() {
- try {
- String config = getFromHTTP("http://localhost:" + port + "/jobmanager/config");
-
- Map<String, String> conf = WebMonitorUtils.fromKeyValueJsonArray(config);
- assertEquals(
- cluster.configuration().getString("taskmanager.numberOfTaskSlots", null),
- conf.get(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS));
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testStop() throws Exception {
- // this only works if there is no active job at this point
- assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty());
-
- // Create a task
- final JobVertex sender = new JobVertex("Sender");
- sender.setParallelism(2);
- sender.setInvokableClass(StoppableInvokable.class);
-
- final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
- final JobID jid = jobGraph.getJobID();
-
- cluster.submitJobDetached(jobGraph);
-
- // wait for job to show up
- while (cluster.getCurrentlyRunningJobsJava().isEmpty()) {
- Thread.sleep(10);
- }
-
- final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
- final Deadline deadline = testTimeout.fromNow();
-
- while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
- try (HttpTestClient client = new HttpTestClient("localhost", port)) {
- // Request the file from the web server
- client.sendDeleteRequest("/jobs/" + jid + "/stop", deadline.timeLeft());
- HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
-
- assertEquals(HttpResponseStatus.OK, response.getStatus());
- assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json"));
- assertEquals("{}", response.getContent());
- }
-
- Thread.sleep(20);
- }
-
- // ensure we can access job details when its finished (FLINK-4011)
- try (HttpTestClient client = new HttpTestClient("localhost", port)) {
- FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
- client.sendGetRequest("/jobs/" + jid + "/config", timeout);
- HttpTestClient.SimpleHttpResponse response = client.getNextResponse(timeout);
-
- assertEquals(HttpResponseStatus.OK, response.getStatus());
- assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json"));
- assertEquals("{\"jid\":\""+jid+"\",\"name\":\"Stoppable streaming test job\"," +
- "\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"default\"," +
- "\"job-parallelism\":-1,\"object-reuse-mode\":false,\"user-config\":{}}}", response.getContent());
- }
- }
-
- @Test
- public void testStopYarn() throws Exception {
- // this only works if there is no active job at this point
- assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty());
-
- // Create a task
- final JobVertex sender = new JobVertex("Sender");
- sender.setParallelism(2);
- sender.setInvokableClass(StoppableInvokable.class);
-
- final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
- final JobID jid = jobGraph.getJobID();
-
- cluster.submitJobDetached(jobGraph);
-
- // wait for job to show up
- while (cluster.getCurrentlyRunningJobsJava().isEmpty()) {
- Thread.sleep(10);
- }
-
- final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
- final Deadline deadline = testTimeout.fromNow();
-
- while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
- try (HttpTestClient client = new HttpTestClient("localhost", port)) {
- // Request the file from the web server
- client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft());
-
- HttpTestClient.SimpleHttpResponse response = client
- .getNextResponse(deadline.timeLeft());
-
- assertEquals(HttpResponseStatus.OK, response.getStatus());
- assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json"));
- assertEquals("{}", response.getContent());
- }
-
- Thread.sleep(20);
- }
- }
-}
[3/4] flink git commit: [FLINK-7052][blob] remove unused
NAME_ADDRESSABLE mode
Posted by ch...@apache.org.
[FLINK-7052][blob] remove unused NAME_ADDRESSABLE mode
This closes #4158.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f68856af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f68856af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f68856af
Branch: refs/heads/master
Commit: f68856afb0628adcc7df223ac520df6abdf716cb
Parents: 211d096
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Jun 16 10:51:04 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 11 18:19:25 2017 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/blob/BlobClient.java | 252 ++-----------------
.../apache/flink/runtime/blob/BlobServer.java | 27 --
.../runtime/blob/BlobServerConnection.java | 217 +++-------------
.../flink/runtime/blob/BlobServerProtocol.java | 16 +-
.../apache/flink/runtime/blob/BlobStore.java | 22 --
.../apache/flink/runtime/blob/BlobUtils.java | 80 +-----
.../org/apache/flink/runtime/blob/BlobView.java | 10 -
.../flink/runtime/blob/FileSystemBlobStore.java | 15 --
.../flink/runtime/blob/VoidBlobStore.java | 11 -
.../flink/runtime/blob/BlobClientSslTest.java | 49 ----
.../flink/runtime/blob/BlobClientTest.java | 96 -------
.../flink/runtime/blob/BlobRecoveryITCase.java | 26 --
.../runtime/blob/BlobServerDeleteTest.java | 213 +---------------
.../flink/runtime/blob/BlobServerPutTest.java | 133 +---------
.../BlobLibraryCacheManagerTest.java | 13 +-
15 files changed, 83 insertions(+), 1097 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 0f02e75..ce59d75 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -55,9 +55,6 @@ import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE;
import static org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_ID_SCOPE;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.MAX_KEY_LENGTH;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.NAME_ADDRESSABLE;
import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
@@ -139,46 +136,6 @@ public final class BlobClient implements Closeable {
// --------------------------------------------------------------------------------------------
/**
- * Downloads the BLOB identified by the given job ID and key from the BLOB server. If no such BLOB exists on the
- * server, a {@link FileNotFoundException} is thrown.
- *
- * @param jobID
- * the job ID identifying the BLOB to download
- * @param key
- * the key identifying the BLOB to download
- * @return an input stream to read the retrieved data from
- * @throws IOException
- * thrown if an I/O error occurs during the download
- */
- public InputStream get(JobID jobID, String key) throws IOException {
- if (key.length() > MAX_KEY_LENGTH) {
- throw new IllegalArgumentException("Keys must not be longer than " + MAX_KEY_LENGTH);
- }
-
- if (this.socket.isClosed()) {
- throw new IllegalStateException("BLOB Client is not connected. " +
- "Client has been shut down or encountered an error before.");
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("GET BLOB %s / \"%s\" from %s", jobID, key, socket.getLocalSocketAddress()));
- }
-
- try {
- OutputStream os = this.socket.getOutputStream();
- InputStream is = this.socket.getInputStream();
-
- sendGetHeader(os, jobID, key, null);
- receiveAndCheckResponse(is);
-
- return new BlobInputStream(is, null);
- }
- catch (Throwable t) {
- BlobUtils.closeSilently(socket, LOG);
- throw new IOException("GET operation failed: " + t.getMessage(), t);
- }
- }
-
- /**
* Downloads the BLOB identified by the given BLOB key from the BLOB server. If no such BLOB exists on the server, a
* {@link FileNotFoundException} is thrown.
*
@@ -202,7 +159,7 @@ public final class BlobClient implements Closeable {
InputStream is = this.socket.getInputStream();
// Send GET header
- sendGetHeader(os, null, null, blobKey);
+ sendGetHeader(os, null, blobKey);
receiveAndCheckResponse(is);
return new BlobInputStream(is, blobKey);
@@ -221,33 +178,23 @@ public final class BlobClient implements Closeable {
* @param jobID
* the job ID identifying the BLOB to download or <code>null</code> to indicate the BLOB key should be used
* to identify the BLOB on the server instead
- * @param key
- * the key identifying the BLOB to download or <code>null</code> to indicate the BLOB key should be used to
- * identify the BLOB on the server instead
* @param blobKey
* the BLOB key to identify the BLOB to download if either the job ID or the regular key are
* <code>null</code>
* @throws IOException
* thrown if an I/O error occurs while writing the header data to the output stream
*/
- private void sendGetHeader(OutputStream outputStream, JobID jobID, String key, BlobKey blobKey) throws IOException {
+ private void sendGetHeader(OutputStream outputStream, JobID jobID, BlobKey blobKey) throws IOException {
+ checkArgument(jobID == null);
// Signal type of operation
outputStream.write(GET_OPERATION);
// Check if GET should be done in content-addressable manner
- if (jobID == null || key == null) {
+ if (jobID == null) {
outputStream.write(CONTENT_ADDRESSABLE);
blobKey.writeToOutputStream(outputStream);
}
- else {
- outputStream.write(NAME_ADDRESSABLE);
- // Send job ID and key
- outputStream.write(jobID.getBytes());
- byte[] keyBytes = key.getBytes(BlobUtils.DEFAULT_CHARSET);
- writeLength(keyBytes.length, outputStream);
- outputStream.write(keyBytes);
- }
}
private void receiveAndCheckResponse(InputStream is) throws IOException {
@@ -296,68 +243,7 @@ public final class BlobClient implements Closeable {
* thrown if an I/O error occurs while uploading the data to the BLOB server
*/
public BlobKey put(byte[] value, int offset, int len) throws IOException {
- return putBuffer(null, null, value, offset, len);
- }
-
- /**
- * Uploads the data of the given byte array to the BLOB server and stores it under the given job ID and key.
- *
- * @param jobId
- * the job ID to identify the uploaded data
- * @param key
- * the key to identify the uploaded data
- * @param value
- * the buffer to upload
- * @throws IOException
- * thrown if an I/O error occurs while uploading the data to the BLOB server
- */
- public void put(JobID jobId, String key, byte[] value) throws IOException {
- put(jobId, key, value, 0, value.length);
- }
-
- /**
- * Uploads data from the given byte array to the BLOB server and stores it under the given job ID and key.
- *
- * @param jobId
- * the job ID to identify the uploaded data
- * @param key
- * the key to identify the uploaded data
- * @param value
- * the buffer to upload data from
- * @param offset
- * the read offset within the buffer
- * @param len
- * the number of bytes to upload from the buffer
- * @throws IOException
- * thrown if an I/O error occurs while uploading the data to the BLOB server
- */
- public void put(JobID jobId, String key, byte[] value, int offset, int len) throws IOException {
- if (key.length() > MAX_KEY_LENGTH) {
- throw new IllegalArgumentException("Keys must not be longer than " + MAX_KEY_LENGTH);
- }
-
- putBuffer(jobId, key, value, offset, len);
- }
-
- /**
- * Uploads data from the given input stream to the BLOB server and stores it under the given job ID and key.
- *
- * @param jobId
- * the job ID to identify the uploaded data
- * @param key
- * the key to identify the uploaded data
- * @param inputStream
- * the input stream to read the data from
- * @throws IOException
- * thrown if an I/O error occurs while reading the data from the input stream or uploading the data to the
- * BLOB server
- */
- public void put(JobID jobId, String key, InputStream inputStream) throws IOException {
- if (key.length() > MAX_KEY_LENGTH) {
- throw new IllegalArgumentException("Keys must not be longer than " + MAX_KEY_LENGTH);
- }
-
- putInputStream(jobId, key, inputStream);
+ return putBuffer(null, value, offset, len);
}
/**
@@ -371,7 +257,7 @@ public final class BlobClient implements Closeable {
* BLOB server
*/
public BlobKey put(InputStream inputStream) throws IOException {
- return putInputStream(null, null, inputStream);
+ return putInputStream(null, inputStream);
}
/**
@@ -380,9 +266,6 @@ public final class BlobClient implements Closeable {
* @param jobId
* the ID of the job the BLOB belongs to or <code>null</code> to store the BLOB in a content-addressable
* manner
- * @param key
- * the key to identify the BLOB on the server or <code>null</code> to store the BLOB in a content-addressable
- * manner
* @param value
* the buffer to read the data from
* @param offset
@@ -394,7 +277,7 @@ public final class BlobClient implements Closeable {
* @throws IOException
* thrown if an I/O error occurs while uploading the data to the BLOB server
*/
- private BlobKey putBuffer(JobID jobId, String key, byte[] value, int offset, int len) throws IOException {
+ private BlobKey putBuffer(JobID jobId, byte[] value, int offset, int len) throws IOException {
if (this.socket.isClosed()) {
throw new IllegalStateException("BLOB Client is not connected. " +
"Client has been shut down or encountered an error before.");
@@ -404,9 +287,6 @@ public final class BlobClient implements Closeable {
if (jobId == null) {
LOG.debug(String.format("PUT content addressable BLOB buffer (%d bytes) to %s",
len, socket.getLocalSocketAddress()));
- } else {
- LOG.debug(String.format("PUT BLOB buffer (%d bytes) under %s / \"%s\" to %s",
- len, jobId, key, socket.getLocalSocketAddress()));
}
}
@@ -415,7 +295,7 @@ public final class BlobClient implements Closeable {
final MessageDigest md = jobId == null ? BlobUtils.createMessageDigest() : null;
// Send the PUT header
- sendPutHeader(os, jobId, key);
+ sendPutHeader(os, jobId);
// Send the value in iterations of BUFFER_SIZE
int remainingBytes = len;
@@ -453,9 +333,6 @@ public final class BlobClient implements Closeable {
* @param jobId
* the ID of the job the BLOB belongs to or <code>null</code> to store the BLOB in a content-addressable
* manner
- * @param key
- * the key to identify the BLOB on the server or <code>null</code> to store the BLOB in a content-addressable
- * manner
* @param inputStream
* the input stream to read the data from
* @return he computed BLOB key if the BLOB has been stored in a content-addressable manner, <code>null</code>
@@ -463,7 +340,7 @@ public final class BlobClient implements Closeable {
* @throws IOException
* thrown if an I/O error occurs while uploading the data to the BLOB server
*/
- private BlobKey putInputStream(JobID jobId, String key, InputStream inputStream) throws IOException {
+ private BlobKey putInputStream(JobID jobId, InputStream inputStream) throws IOException {
if (this.socket.isClosed()) {
throw new IllegalStateException("BLOB Client is not connected. " +
"Client has been shut down or encountered an error before.");
@@ -473,9 +350,6 @@ public final class BlobClient implements Closeable {
if (jobId == null) {
LOG.debug(String.format("PUT content addressable BLOB stream to %s",
socket.getLocalSocketAddress()));
- } else {
- LOG.debug(String.format("PUT BLOB stream under %s / \"%s\" to %s",
- jobId, key, socket.getLocalSocketAddress()));
}
}
@@ -485,7 +359,7 @@ public final class BlobClient implements Closeable {
final byte[] xferBuf = new byte[BUFFER_SIZE];
// Send the PUT header
- sendPutHeader(os, jobId, key);
+ sendPutHeader(os, jobId);
while (true) {
final int read = inputStream.read(xferBuf);
@@ -551,33 +425,17 @@ public final class BlobClient implements Closeable {
* @param jobID
* the ID of job the BLOB belongs to or <code>null</code> to indicate the upload of a
* content-addressable BLOB
- * @param key
- * the key of the BLOB to upload or <code>null</code> to indicate the upload of a content-addressable BLOB
* @throws IOException
* thrown if an I/O error occurs while writing the header data to the output stream
*/
- private void sendPutHeader(OutputStream outputStream, JobID jobID, String key) throws IOException {
- // sanity check that either both are null or both are not null
- if ((jobID != null || key != null) && !(jobID != null && key != null)) {
- throw new IllegalArgumentException();
- }
+ private void sendPutHeader(OutputStream outputStream, JobID jobID) throws IOException {
+ checkArgument(jobID == null);
// Signal type of operation
outputStream.write(PUT_OPERATION);
// Check if PUT should be done in content-addressable manner
- if (jobID == null) {
- outputStream.write(CONTENT_ADDRESSABLE);
- }
- else {
- outputStream.write(NAME_ADDRESSABLE);
- // Send job ID and the key
- byte[] idBytes = jobID.getBytes();
- byte[] keyBytes = key.getBytes(BlobUtils.DEFAULT_CHARSET);
- outputStream.write(idBytes);
- writeLength(keyBytes.length, outputStream);
- outputStream.write(keyBytes);
- }
+ outputStream.write(CONTENT_ADDRESSABLE);
}
// --------------------------------------------------------------------------------------------
@@ -587,67 +445,14 @@ public final class BlobClient implements Closeable {
/**
* Deletes the BLOB identified by the given BLOB key from the BLOB server.
*
- * @param key
+ * @param blobKey
* the key to identify the BLOB
* @throws IOException
* thrown if an I/O error occurs while transferring the request to
* the BLOB server or if the BLOB server cannot delete the file
*/
- public void delete(BlobKey key) throws IOException {
- checkArgument(key != null, "BLOB key must not be null.");
-
- deleteInternal(null, null, key);
- }
-
- /**
- * Deletes the BLOB identified by the given job ID and key from the BLOB server.
- *
- * @param jobId
- * the job ID to identify the BLOB
- * @param key
- * the key to identify the BLOB
- * @throws IOException
- * thrown if an I/O error occurs while transferring the request to the BLOB server
- */
- public void delete(JobID jobId, String key) throws IOException {
- checkArgument(jobId != null, "Job id must not be null.");
- checkArgument(key != null, "BLOB name must not be null.");
-
- if (key.length() > MAX_KEY_LENGTH) {
- throw new IllegalArgumentException("Keys must not be longer than " + MAX_KEY_LENGTH);
- }
-
- deleteInternal(jobId, key, null);
- }
-
- /**
- * Deletes all BLOBs belonging to the job with the given ID from the BLOB server
- *
- * @param jobId
- * the job ID to identify the BLOBs to be deleted
- * @throws IOException
- * thrown if an I/O error occurs while transferring the request to the BLOB server
- */
- public void deleteAll(JobID jobId) throws IOException {
- checkArgument(jobId != null, "Job id must not be null.");
-
- deleteInternal(jobId, null, null);
- }
-
- /**
- * Delete one or multiple BLOBs from the BLOB server.
- *
- * @param jobId The job ID to identify the BLOB(s) to be deleted.
- * @param key The key to identify the specific BLOB to delete or <code>null</code> to delete
- * all BLOBs associated with the job id.
- * @param bKey The blob key to identify a specific content addressable BLOB. This parameter
- * is exclusive with jobId and key.
- * @throws IOException Thrown if an I/O error occurs while transferring the request to the BLOB server.
- */
- private void deleteInternal(JobID jobId, String key, BlobKey bKey) throws IOException {
- if ((jobId != null && bKey != null) || (jobId == null && bKey == null)) {
- throw new IllegalArgumentException();
- }
+ public void delete(BlobKey blobKey) throws IOException {
+ checkArgument(blobKey != null, "BLOB key must not be null.");
try {
final OutputStream outputStream = this.socket.getOutputStream();
@@ -656,28 +461,9 @@ public final class BlobClient implements Closeable {
// Signal type of operation
outputStream.write(DELETE_OPERATION);
- // Check if DELETE should be done in content-addressable manner
- if (jobId == null) {
- // delete blob key
- outputStream.write(CONTENT_ADDRESSABLE);
- bKey.writeToOutputStream(outputStream);
- }
- else if (key != null) {
- // delete BLOB for jobID and name key
- outputStream.write(NAME_ADDRESSABLE);
- // Send job ID and the key
- byte[] idBytes = jobId.getBytes();
- byte[] keyBytes = key.getBytes(BlobUtils.DEFAULT_CHARSET);
- outputStream.write(idBytes);
- writeLength(keyBytes.length, outputStream);
- outputStream.write(keyBytes);
- }
- else {
- // delete all blobs for JobID
- outputStream.write(JOB_ID_SCOPE);
- byte[] idBytes = jobId.getBytes();
- outputStream.write(idBytes);
- }
+ // delete blob key
+ outputStream.write(CONTENT_ADDRESSABLE);
+ blobKey.writeToOutputStream(outputStream);
int response = inputStream.read();
if (response < 0) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 3b3a39b..ecb4527 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.blob;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -26,7 +25,6 @@ import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.NetUtils;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -200,31 +198,6 @@ public class BlobServer extends Thread implements BlobService {
}
/**
- * Returns a file handle to the file identified by the given jobID and key.
- *
- * <p><strong>This is only called from the {@link BlobServerConnection}</strong>
- *
- * @param jobID to which the file is associated
- * @param key to identify the file within the job context
- * @return file handle to the file
- */
- File getStorageLocation(JobID jobID, String key) {
- return BlobUtils.getStorageLocation(storageDir, jobID, key);
- }
-
- /**
- * Method which deletes all files associated with the given jobID.
- *
- * <p><strong>This is only called from the {@link BlobServerConnection}</strong>
- *
- * @param jobID all files associated to this jobID will be deleted
- * @throws IOException
- */
- void deleteJobDirectory(JobID jobID) throws IOException {
- BlobUtils.deleteJobDirectory(storageDir, jobID);
- }
-
- /**
* Returns a temporary file inside the BLOB server's incoming directory.
*
* @return a temporary file inside the BLOB server's incoming directory
http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index ad33c5d..181211d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -42,9 +42,6 @@ import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE;
import static org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_ID_SCOPE;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.MAX_KEY_LENGTH;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.NAME_ADDRESSABLE;
import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
@@ -112,7 +109,6 @@ class BlobServerConnection extends Thread {
try {
final InputStream inputStream = this.clientSocket.getInputStream();
final OutputStream outputStream = this.clientSocket.getOutputStream();
- final byte[] buffer = new byte[BUFFER_SIZE];
while (true) {
// Read the requested operation
@@ -124,13 +120,13 @@ class BlobServerConnection extends Thread {
switch (operation) {
case PUT_OPERATION:
- put(inputStream, outputStream, buffer);
+ put(inputStream, outputStream, new byte[BUFFER_SIZE]);
break;
case GET_OPERATION:
- get(inputStream, outputStream, buffer);
+ get(inputStream, outputStream, new byte[BUFFER_SIZE]);
break;
case DELETE_OPERATION:
- delete(inputStream, outputStream, buffer);
+ delete(inputStream, outputStream);
break;
default:
throw new IOException("Unknown operation " + operation);
@@ -182,7 +178,7 @@ class BlobServerConnection extends Thread {
* thrown if an I/O error occurs while reading/writing data from/to the respective streams
*/
private void get(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {
- /**
+ /*
* Retrieve the file from the (distributed?) BLOB store and store it
* locally, then send it to the service which requested it.
*
@@ -194,7 +190,6 @@ class BlobServerConnection extends Thread {
File blobFile;
int contentAddressable = -1;
JobID jobId = null;
- String key = null;
BlobKey blobKey = null;
try {
@@ -203,16 +198,7 @@ class BlobServerConnection extends Thread {
if (contentAddressable < 0) {
throw new EOFException("Premature end of GET request");
}
- if (contentAddressable == NAME_ADDRESSABLE) {
- // Receive the job ID and key
- byte[] jidBytes = new byte[JobID.SIZE];
- readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
-
- jobId = JobID.fromByteArray(jidBytes);
- key = readKey(buf, inputStream);
- blobFile = blobServer.getStorageLocation(jobId, key);
- }
- else if (contentAddressable == CONTENT_ADDRESSABLE) {
+ if (contentAddressable == CONTENT_ADDRESSABLE) {
blobKey = BlobKey.readFromInputStream(inputStream);
blobFile = blobServer.getStorageLocation(blobKey);
}
@@ -248,13 +234,7 @@ class BlobServerConnection extends Thread {
if (blobFile.exists()) {
LOG.debug("Blob file {} has downloaded from the BlobStore by a different connection.", blobFile);
} else {
- if (contentAddressable == NAME_ADDRESSABLE) {
- blobStore.get(jobId, key, blobFile);
- } else if (contentAddressable == CONTENT_ADDRESSABLE) {
- blobStore.get(blobKey, blobFile);
- } else {
- throw new IOException("Unknown type of BLOB addressing: " + contentAddressable + '.');
- }
+ blobStore.get(blobKey, blobFile);
}
} finally {
writeLock.unlock();
@@ -321,7 +301,6 @@ class BlobServerConnection extends Thread {
*/
private void put(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {
JobID jobID = null;
- String key = null;
MessageDigest md = null;
File incomingFile = null;
@@ -333,14 +312,7 @@ class BlobServerConnection extends Thread {
throw new EOFException("Premature end of PUT request");
}
- if (contentAddressable == NAME_ADDRESSABLE) {
- // Receive the job ID and key
- byte[] jidBytes = new byte[JobID.SIZE];
- readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
- jobID = JobID.fromByteArray(jidBytes);
- key = readKey(buf, inputStream);
- }
- else if (contentAddressable == CONTENT_ADDRESSABLE) {
+ if (contentAddressable == CONTENT_ADDRESSABLE) {
md = BlobUtils.createMessageDigest();
}
else {
@@ -348,11 +320,7 @@ class BlobServerConnection extends Thread {
}
if (LOG.isDebugEnabled()) {
- if (contentAddressable == NAME_ADDRESSABLE) {
- LOG.debug(String.format("Received PUT request for BLOB under %s / \"%s\"", jobID, key));
- } else {
- LOG.debug("Received PUT request for content addressable BLOB");
- }
+ LOG.debug("Received PUT request for content addressable BLOB");
}
incomingFile = blobServer.createTemporaryFilename();
@@ -377,89 +345,47 @@ class BlobServerConnection extends Thread {
}
fos.close();
- if (contentAddressable == NAME_ADDRESSABLE) {
- File storageFile = blobServer.getStorageLocation(jobID, key);
+ BlobKey blobKey = new BlobKey(md.digest());
+ File storageFile = blobServer.getStorageLocation(blobKey);
- writeLock.lock();
+ writeLock.lock();
- try {
- // first check whether the file already exists
- if (!storageFile.exists()) {
- try {
- // only move the file if it does not yet exist
- Files.move(incomingFile.toPath(), storageFile.toPath());
-
- incomingFile = null;
-
- } catch (FileAlreadyExistsException ignored) {
- LOG.warn("Detected concurrent file modifications. This should only happen if multiple" +
- "BlobServer use the same storage directory.");
- // we cannot be sure at this point whether the file has already been uploaded to the blob
- // store or not. Even if the blobStore might shortly be in an inconsistent state, we have
- // persist the blob. Otherwise we might not be able to recover the job.
- }
-
- // only the one moving the incoming file to its final destination is allowed to upload the
- // file to the blob store
- blobStore.put(storageFile, jobID, key);
- }
- } catch(IOException ioe) {
- // we failed to either create the local storage file or to upload it --> try to delete the local file
- // while still having the write lock
- if (storageFile.exists() && !storageFile.delete()) {
- LOG.warn("Could not delete the storage file.");
- }
-
- throw ioe;
- } finally {
- writeLock.unlock();
- }
-
- outputStream.write(RETURN_OKAY);
- }
- else {
- BlobKey blobKey = new BlobKey(md.digest());
- File storageFile = blobServer.getStorageLocation(blobKey);
-
- writeLock.lock();
+ try {
+ // first check whether the file already exists
+ if (!storageFile.exists()) {
+ try {
+ // only move the file if it does not yet exist
+ Files.move(incomingFile.toPath(), storageFile.toPath());
- try {
- // first check whether the file already exists
- if (!storageFile.exists()) {
- try {
- // only move the file if it does not yet exist
- Files.move(incomingFile.toPath(), storageFile.toPath());
-
- incomingFile = null;
-
- } catch (FileAlreadyExistsException ignored) {
- LOG.warn("Detected concurrent file modifications. This should only happen if multiple" +
- "BlobServer use the same storage directory.");
- // we cannot be sure at this point whether the file has already been uploaded to the blob
- // store or not. Even if the blobStore might shortly be in an inconsistent state, we have
- // persist the blob. Otherwise we might not be able to recover the job.
- }
+ incomingFile = null;
- // only the one moving the incoming file to its final destination is allowed to upload the
- // file to the blob store
- blobStore.put(storageFile, blobKey);
- }
- } catch(IOException ioe) {
- // we failed to either create the local storage file or to upload it --> try to delete the local file
- // while still having the write lock
- if (storageFile.exists() && !storageFile.delete()) {
- LOG.warn("Could not delete the storage file.");
+ } catch (FileAlreadyExistsException ignored) {
+ LOG.warn("Detected concurrent file modifications. This should only happen if multiple" +
+ "BlobServer use the same storage directory.");
+ // we cannot be sure at this point whether the file has already been uploaded to the blob
+ // store or not. Even if the blobStore might shortly be in an inconsistent state, we have
+ // persist the blob. Otherwise we might not be able to recover the job.
}
- throw ioe;
- } finally {
- writeLock.unlock();
+ // only the one moving the incoming file to its final destination is allowed to upload the
+ // file to the blob store
+ blobStore.put(storageFile, blobKey);
+ }
+ } catch(IOException ioe) {
+ // we failed to either create the local storage file or to upload it --> try to delete the local file
+ // while still having the write lock
+ if (storageFile.exists() && !storageFile.delete()) {
+ LOG.warn("Could not delete the storage file.");
}
- // Return computed key to client for validation
- outputStream.write(RETURN_OKAY);
- blobKey.writeToOutputStream(outputStream);
+ throw ioe;
+ } finally {
+ writeLock.unlock();
}
+
+ // Return computed key to client for validation
+ outputStream.write(RETURN_OKAY);
+ blobKey.writeToOutputStream(outputStream);
}
catch (SocketException e) {
// happens when the other side disconnects
@@ -499,7 +425,7 @@ class BlobServerConnection extends Thread {
* @param outputStream The output stream to write the response to.
* @throws java.io.IOException Thrown if an I/O error occurs while reading the request data from the input stream.
*/
- private void delete(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {
+ private void delete(InputStream inputStream, OutputStream outputStream) throws IOException {
try {
int type = inputStream.read();
@@ -511,46 +437,6 @@ class BlobServerConnection extends Thread {
BlobKey key = BlobKey.readFromInputStream(inputStream);
blobServer.delete(key);
}
- else if (type == NAME_ADDRESSABLE) {
- byte[] jidBytes = new byte[JobID.SIZE];
- readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
- JobID jobID = JobID.fromByteArray(jidBytes);
-
- String key = readKey(buf, inputStream);
-
- File blobFile = this.blobServer.getStorageLocation(jobID, key);
-
- writeLock.lock();
-
- try {
- // we should make the local and remote file deletion atomic, otherwise we might risk not
- // removing the remote file in case of a concurrent put operation
- if (blobFile.exists() && !blobFile.delete()) {
- LOG.warn("Cannot delete local BLOB file " + blobFile.getAbsolutePath());
- }
-
- blobStore.delete(jobID, key);
- } finally {
- writeLock.unlock();
- }
- }
- else if (type == JOB_ID_SCOPE) {
- byte[] jidBytes = new byte[JobID.SIZE];
- readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
- JobID jobID = JobID.fromByteArray(jidBytes);
-
- writeLock.lock();
-
- try {
- // we should make the local and remote file deletion atomic, otherwise we might risk not
- // removing the remote file in case of a concurrent put operation
- blobServer.deleteJobDirectory(jobID);
-
- blobStore.deleteAll(jobID);
- } finally {
- writeLock.unlock();
- }
- }
else {
throw new IOException("Unrecognized addressing type: " + type);
}
@@ -575,27 +461,6 @@ class BlobServerConnection extends Thread {
// --------------------------------------------------------------------------------------------
/**
- * Reads the key of a BLOB from the given input stream.
- *
- * @param buf
- * auxiliary buffer to data deserialization
- * @param inputStream
- * the input stream to read the key from
- * @return the key of a BLOB
- * @throws IOException
- * thrown if an I/O error occurs while reading the key data from the input stream
- */
- private static String readKey(byte[] buf, InputStream inputStream) throws IOException {
- final int keyLength = readLength(inputStream);
- if (keyLength > MAX_KEY_LENGTH) {
- throw new IOException("Unexpected key length " + keyLength);
- }
-
- readFully(inputStream, buf, 0, keyLength, "BlobKey");
- return new String(buf, 0, keyLength, BlobUtils.DEFAULT_CHARSET);
- }
-
- /**
* Writes to the output stream the error return code, and the given exception in serialized form.
*
* @param out Thr output stream to write to.
http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
index 6df7811..d8ac833 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
@@ -26,9 +26,6 @@ public class BlobServerProtocol {
/** The buffer size in bytes for network transfers. */
static final int BUFFER_SIZE = 65536; // 64 K
- /** The maximum key length allowed for storing BLOBs. */
- static final int MAX_KEY_LENGTH = 64;
-
/** Internal code to identify a PUT operation. */
static final byte PUT_OPERATION = 0;
@@ -44,15 +41,14 @@ public class BlobServerProtocol {
/** Internal code to identify an erroneous operation. */
static final byte RETURN_ERROR = 1;
- /** Internal code to identify a reference via content hash as the key */
+ /**
+ * Internal code to identify a reference via content hash as the key.
+ * <p>
+ * Note: previously, there was also <tt>NAME_ADDRESSABLE</tt> (code <tt>1</tt>) and
+ * <tt>JOB_ID_SCOPE</tt> (code <tt>2</tt>).
+ */
static final byte CONTENT_ADDRESSABLE = 0;
- /** Internal code to identify a reference via jobId and name as the key */
- static final byte NAME_ADDRESSABLE = 1;
-
- /** Internal code to identify a reference via jobId as the key */
- static final byte JOB_ID_SCOPE = 2;
-
// --------------------------------------------------------------------------------------------
private BlobServerProtocol() {}
http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
index 4c26a5a..1e8b73a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
@@ -38,18 +38,6 @@ public interface BlobStore extends BlobView {
void put(File localFile, BlobKey blobKey) throws IOException;
/**
- * Copies a local file to the blob store.
- *
- * <p>The job ID and key make up a composite key for the file.
- *
- * @param localFile The file to copy
- * @param jobId The JobID part of ID for the file in the blob store
- * @param key The String part of ID for the file in the blob store
- * @throws IOException If the copy fails
- */
- void put(File localFile, JobID jobId, String key) throws IOException;
-
- /**
* Tries to delete a blob from storage.
*
* <p>NOTE: This also tries to delete any created directories if empty.</p>
@@ -59,16 +47,6 @@ public interface BlobStore extends BlobView {
void delete(BlobKey blobKey);
/**
- * Tries to delete a blob from storage.
- *
- * <p>NOTE: This also tries to delete any created directories if empty.</p>
- *
- * @param jobId The JobID part of ID for the blob
- * @param key The String part of ID for the blob
- */
- void delete(JobID jobId, String key);
-
- /**
* Tries to delete all blobs for the given job from storage.
*
* <p>NOTE: This also tries to delete any created directories if empty.</p>
http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 9d538cc..e8f3fe5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -18,16 +18,13 @@
package org.apache.flink.runtime.blob;
-import com.google.common.io.BaseEncoding;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.util.FileUtils;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
@@ -37,7 +34,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
-import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.UUID;
@@ -58,17 +54,12 @@ public class BlobUtils {
/**
* The prefix of all BLOB files stored by the BLOB server.
*/
- static final String BLOB_FILE_PREFIX = "blob_";
+ private static final String BLOB_FILE_PREFIX = "blob_";
/**
* The prefix of all job-specific directories created by the BLOB server.
*/
- static final String JOB_DIR_PREFIX = "job_";
-
- /**
- * The default character set to translate between characters and bytes.
- */
- static final Charset DEFAULT_CHARSET = ConfigConstants.DEFAULT_CHARSET;
+ private static final String JOB_DIR_PREFIX = "job_";
/**
* Creates a BlobStore based on the parameters set in the configuration.
@@ -205,60 +196,6 @@ public class BlobUtils {
}
/**
- * Returns the (designated) physical storage location of the BLOB with the given job ID and key.
- *
- * @param jobID
- * the ID of the job the BLOB belongs to
- * @param key
- * the key of the BLOB
- * @return the (designated) physical storage location of the BLOB with the given job ID and key
- */
- static File getStorageLocation(File storageDir, JobID jobID, String key) {
- return new File(getJobDirectory(storageDir, jobID), BLOB_FILE_PREFIX + encodeKey(key));
- }
-
- /**
- * Returns the BLOB server's storage directory for BLOBs belonging to the job with the given ID.
- *
- * @param jobID
- * the ID of the job to return the storage directory for
- * @return the storage directory for BLOBs belonging to the job with the given ID
- */
- private static File getJobDirectory(File storageDir, JobID jobID) {
- final File jobDirectory = new File(storageDir, JOB_DIR_PREFIX + jobID.toString());
-
- // note: thread-safe create should try to mkdir first and then ignore the case that the
- // directory already existed
- if (!jobDirectory.mkdirs() && !jobDirectory.exists()) {
- throw new RuntimeException("Could not create jobId directory '" + jobDirectory.getAbsolutePath() + "'.");
- }
-
- return jobDirectory;
- }
-
- /**
- * Translates the user's key for a BLOB into the internal name used by the BLOB server
- *
- * @param key
- * the user's key for a BLOB
- * @return the internal name for the BLOB as used by the BLOB server
- */
- static String encodeKey(String key) {
- return BaseEncoding.base64().encode(key.getBytes(DEFAULT_CHARSET));
- }
-
- /**
- * Deletes the storage directory for the job with the given ID.
- *
- * @param jobID
- * jobID whose directory shall be deleted
- */
- static void deleteJobDirectory(File storageDir, JobID jobID) throws IOException {
- File directory = getJobDirectory(storageDir, jobID);
- FileUtils.deleteDirectory(directory);
- }
-
- /**
* Creates a new instance of the message digest to use for the BLOB key computation.
*
* @return a new instance of the message digest to use for the BLOB key computation
@@ -409,19 +346,6 @@ public class BlobUtils {
}
/**
- * Returns the path for the given job ID and key.
- *
- * <p>The returned path can be used with the state backend for recovery purposes.
- *
- * <p>This follows the same scheme as {@link #getStorageLocation(File, JobID, String)}.
- */
- static String getRecoveryPath(String basePath, JobID jobId, String key) {
- // format: $base/job_$id/blob_$key
- return String.format("%s/%s%s/%s%s", basePath, JOB_DIR_PREFIX, jobId.toString(),
- BLOB_FILE_PREFIX, encodeKey(key));
- }
-
- /**
* Returns the path for the given job ID.
*
* <p>The returned path can be used with the state backend for recovery purposes.
http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
index 11cf011..d174cf0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
@@ -36,14 +36,4 @@ public interface BlobView {
* @throws IOException If the copy fails
*/
void get(BlobKey blobKey, File localFile) throws IOException;
-
- /**
- * Copies a blob to a local file.
- *
- * @param jobId The JobID part of ID for the blob
- * @param key The String part of ID for the blob
- * @param localFile The local file to copy to
- * @throws IOException If the copy fails
- */
- void get(JobID jobId, String key, File localFile) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index f3607d8..5f8058b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -68,11 +68,6 @@ public class FileSystemBlobStore implements BlobStoreService {
put(localFile, BlobUtils.getRecoveryPath(basePath, blobKey));
}
- @Override
- public void put(File localFile, JobID jobId, String key) throws IOException {
- put(localFile, BlobUtils.getRecoveryPath(basePath, jobId, key));
- }
-
private void put(File fromFile, String toBlobPath) throws IOException {
try (OutputStream os = fileSystem.create(new Path(toBlobPath), FileSystem.WriteMode.OVERWRITE)) {
LOG.debug("Copying from {} to {}.", fromFile, toBlobPath);
@@ -87,11 +82,6 @@ public class FileSystemBlobStore implements BlobStoreService {
get(BlobUtils.getRecoveryPath(basePath, blobKey), localFile);
}
- @Override
- public void get(JobID jobId, String key, File localFile) throws IOException {
- get(BlobUtils.getRecoveryPath(basePath, jobId, key), localFile);
- }
-
private void get(String fromBlobPath, File toFile) throws IOException {
checkNotNull(fromBlobPath, "Blob path");
checkNotNull(toFile, "File");
@@ -127,11 +117,6 @@ public class FileSystemBlobStore implements BlobStoreService {
}
@Override
- public void delete(JobID jobId, String key) {
- delete(BlobUtils.getRecoveryPath(basePath, jobId, key));
- }
-
- @Override
public void deleteAll(JobID jobId) {
delete(BlobUtils.getRecoveryPath(basePath, jobId));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
index c14d082..6e2bb53 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
@@ -32,27 +32,16 @@ public class VoidBlobStore implements BlobStoreService {
public void put(File localFile, BlobKey blobKey) throws IOException {
}
- @Override
- public void put(File localFile, JobID jobId, String key) throws IOException {
- }
@Override
public void get(BlobKey blobKey, File localFile) throws IOException {
}
@Override
- public void get(JobID jobId, String key, File localFile) throws IOException {
- }
-
- @Override
public void delete(BlobKey blobKey) {
}
@Override
- public void delete(JobID jobId, String key) {
- }
-
- @Override
public void deleteAll(JobID jobId) {
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
index 340ac42..a5189ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
@@ -237,55 +237,6 @@ public class BlobClientSslTest extends TestLogger {
}
/**
- * Tests the PUT/GET operations for regular (non-content-addressable) streams.
- */
- @Test
- public void testRegularStream() {
-
- final JobID jobID = JobID.generate();
- final String key = "testkey3";
-
- try {
- final File testFile = File.createTempFile("testfile", ".dat");
- testFile.deleteOnExit();
- prepareTestFile(testFile);
-
- BlobClient client = null;
- InputStream is = null;
- try {
-
- final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SSL_SERVER.getPort());
- client = new BlobClient(serverAddress, sslClientConfig);
-
- // Store the data
- is = new FileInputStream(testFile);
- client.put(jobID, key, is);
-
- is.close();
- is = null;
-
- // Retrieve the data
- is = client.get(jobID, key);
- validateGet(is, testFile);
-
- }
- finally {
- if (is != null) {
- is.close();
- }
- if (client != null) {
- client.close();
- }
- }
-
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, List)} helper.
*/
private void uploadJarFile(BlobServer blobServer, Configuration blobClientConfig) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index fda4ee9..0a8f738 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -286,102 +286,6 @@ public class BlobClientTest {
}
/**
- * Tests the PUT/GET operations for regular (non-content-addressable) buffers.
- */
- @Test
- public void testRegularBuffer() {
-
- final byte[] testBuffer = createTestBuffer();
- final JobID jobID = JobID.generate();
- final String key = "testkey";
-
- try {
- BlobClient client = null;
- try {
- final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort());
- client = new BlobClient(serverAddress, blobServiceConfig);
-
- // Store the data
- client.put(jobID, key, testBuffer);
-
- // Retrieve the data
- final InputStream is = client.get(jobID, key);
- validateGet(is, testBuffer);
-
- // Delete the data
- client.delete(jobID, key);
-
- // Check if the BLOB is still available
- try {
- client.get(jobID, key);
- fail("Expected IOException did not occur");
- }
- catch (IOException e) {
- // expected
- }
- }
- finally {
- if (client != null) {
- client.close();
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * Tests the PUT/GET operations for regular (non-content-addressable) streams.
- */
- @Test
- public void testRegularStream() {
-
- final JobID jobID = JobID.generate();
- final String key = "testkey3";
-
- try {
- final File testFile = File.createTempFile("testfile", ".dat");
- testFile.deleteOnExit();
- prepareTestFile(testFile);
-
- BlobClient client = null;
- InputStream is = null;
- try {
-
- final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort());
- client = new BlobClient(serverAddress, blobServiceConfig);
-
- // Store the data
- is = new FileInputStream(testFile);
- client.put(jobID, key, is);
-
- is.close();
- is = null;
-
- // Retrieve the data
- is = client.get(jobID, key);
- validateGet(is, testFile);
-
- }
- finally {
- if (is != null) {
- is.close();
- }
- if (client != null) {
- client.close();
- }
- }
-
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, List)} helper.
*/
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index 4f12ddb..c2a3a7a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -97,10 +97,6 @@ public class BlobRecoveryITCase extends TestLogger {
keys[1] = client.put(expected, 32, 256); // Request 2
JobID[] jobId = new JobID[] { new JobID(), new JobID() };
- String[] testKey = new String[] { "test-key-1", "test-key-2" };
-
- client.put(jobId[0], testKey[0], expected); // Request 3
- client.put(jobId[1], testKey[1], expected, 32, 256); // Request 4
// check that the storage directory exists
final Path blobServerPath = new Path(storagePath, "blob");
@@ -132,31 +128,9 @@ public class BlobRecoveryITCase extends TestLogger {
}
}
- // Verify request 3
- try (InputStream is = client.get(jobId[0], testKey[0])) {
- byte[] actual = new byte[expected.length];
- BlobUtils.readFully(is, actual, 0, expected.length, null);
-
- for (int i = 0; i < expected.length; i++) {
- assertEquals(expected[i], actual[i]);
- }
- }
-
- // Verify request 4
- try (InputStream is = client.get(jobId[1], testKey[1])) {
- byte[] actual = new byte[256];
- BlobUtils.readFully(is, actual, 0, 256, null);
-
- for (int i = 32, j = 0; i < 256; i++, j++) {
- assertEquals(expected[i], actual[j]);
- }
- }
-
// Remove again
client.delete(keys[0]);
client.delete(keys[1]);
- client.delete(jobId[0], testKey[0]);
- client.delete(jobId[1], testKey[1]);
// Verify everything is clean
assertTrue("HA storage directory does not exist", fs.exists(new Path(storagePath)));
http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index 15e2c7a..7100e79 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.blob;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
@@ -120,129 +119,6 @@ public class BlobServerDeleteTest extends TestLogger {
}
@Test
- public void testDeleteSingleByName() {
- BlobServer server = null;
- BlobClient client = null;
- BlobStore blobStore = new VoidBlobStore();
-
- try {
- Configuration config = new Configuration();
- server = new BlobServer(config, blobStore);
-
- InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
- client = new BlobClient(serverAddress, config);
-
- byte[] data = new byte[2000000];
- rnd.nextBytes(data);
-
- JobID jobID = new JobID();
- String name1 = "random name";
- String name2 = "any nyme";
-
- client.put(jobID, name1, data);
- client.put(jobID, name2, data);
-
- // issue a DELETE request via the client
- client.delete(jobID, name1);
- client.close();
-
- client = new BlobClient(serverAddress, config);
- try {
- client.get(jobID, name1);
- fail("BLOB should have been deleted");
- }
- catch (IOException e) {
- // expected
- }
-
- try {
- client.put(new byte[1]);
- fail("client should be closed after erroneous operation");
- }
- catch (IllegalStateException e) {
- // expected
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- cleanup(server, client);
- }
- }
-
- @Test
- public void testDeleteAll() {
- BlobServer server = null;
- BlobClient client = null;
- BlobStore blobStore = new VoidBlobStore();
-
- try {
- Configuration config = new Configuration();
- server = new BlobServer(config, blobStore);
-
- InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
- client = new BlobClient(serverAddress, config);
-
- byte[] data = new byte[2000000];
- rnd.nextBytes(data);
-
- JobID jobID = new JobID();
- String name1 = "random name";
- String name2 = "any nyme";
-
- // put content addressable (like libraries)
- client.put(jobID, name1, data);
- client.put(jobID, name2, new byte[712]);
- // items for a second (different!) job ID
- final byte[] jobIdBytes = jobID.getBytes();
- jobIdBytes[0] ^= 1;
- JobID jobID2 = JobID.fromByteArray(jobIdBytes);
- client.put(jobID2, name1, data);
- client.put(jobID2, name2, new byte[712]);
-
-
- // issue a DELETE ALL request via the client
- client.deleteAll(jobID);
- client.close();
-
- client = new BlobClient(serverAddress, config);
- try {
- client.get(jobID, name1);
- fail("BLOB should have been deleted");
- }
- catch (IOException e) {
- // expected
- }
-
- try {
- client.put(new byte[1]);
- fail("client should be closed after erroneous operation");
- }
- catch (IllegalStateException e) {
- // expected
- }
-
- client = new BlobClient(serverAddress, config);
- try {
- client.get(jobID, name2);
- fail("BLOB should have been deleted");
- }
- catch (IOException e) {
- // expected
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- cleanup(server, client);
- }
- }
-
- @Test
public void testDeleteAlreadyDeletedByBlobKey() {
BlobServer server = null;
BlobClient client = null;
@@ -286,48 +162,7 @@ public class BlobServerDeleteTest extends TestLogger {
}
@Test
- public void testDeleteAlreadyDeletedByName() {
- BlobServer server = null;
- BlobClient client = null;
- BlobStore blobStore = new VoidBlobStore();
-
- try {
- Configuration config = new Configuration();
- server = new BlobServer(config, blobStore);
-
- InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
- client = new BlobClient(serverAddress, config);
-
- byte[] data = new byte[2000000];
- rnd.nextBytes(data);
-
- JobID jid = new JobID();
- String name = "------------fdghljEgRJHF+##4U789Q345";
-
- client.put(jid, name, data);
-
- File blobFile = server.getStorageLocation(jid, name);
- assertTrue(blobFile.delete());
-
- // issue a DELETE request via the client
- try {
- client.delete(jid, name);
- }
- catch (IOException e) {
- fail("DELETE operation should not fail if file is already deleted");
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- cleanup(server, client);
- }
- }
-
- @Test
- public void testDeleteFailsByBlobKey() {
+ public void testDeleteByBlobKeyFails() {
assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
BlobServer server = null;
@@ -376,52 +211,6 @@ public class BlobServerDeleteTest extends TestLogger {
}
}
- @Test
- public void testDeleteByNameFails() {
- assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
-
- BlobServer server = null;
- BlobClient client = null;
- BlobStore blobStore = new VoidBlobStore();
-
- File blobFile = null;
- File directory = null;
- try {
- Configuration config = new Configuration();
- server = new BlobServer(config, blobStore);
-
- InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
- client = new BlobClient(serverAddress, config);
-
- byte[] data = new byte[2000000];
- rnd.nextBytes(data);
-
- JobID jid = new JobID();
- String name = "------------fdghljEgRJHF+##4U789Q345";
-
- client.put(jid, name, data);
-
- blobFile = server.getStorageLocation(jid, name);
- directory = blobFile.getParentFile();
-
- assertTrue(blobFile.setWritable(false, false));
- assertTrue(directory.setWritable(false, false));
-
- // issue a DELETE request via the client
- client.delete(jid, name);
-
- // the file should still be there
- client.get(jid, name);
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- } finally {
- blobFile.setWritable(true, false);
- directory.setWritable(true, false);
- cleanup(server, client);
- }
- }
-
/**
* FLINK-6020
*
http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index 31e63a0..8b8ddf9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.blob;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -44,7 +43,11 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
@@ -121,47 +124,6 @@ public class BlobServerPutTest extends TestLogger {
}
}
- /**
- * Checked thread that calls {@link BlobServer#getStorageLocation(JobID, String)}
- */
- public static class NameAddressableGetStorageLocation extends CheckedThread {
- private final BlobServer server;
- private final JobID jid;
- private final String name;
-
- public NameAddressableGetStorageLocation(BlobServer server, JobID jid, String name) {
- this.server = server;
- this.jid = jid;
- this.name = name;
- }
-
- @Override
- public void go() throws Exception {
- server.getStorageLocation(jid, name);
- }
- }
-
- /**
- * Tests concurrent calls to {@link BlobServer#getStorageLocation(JobID, String)}.
- */
- @Test
- public void testServerNameAddressableGetStorageLocationConcurrent() throws Exception {
- BlobServer server = new BlobServer(new Configuration(), new VoidBlobStore());
-
- try {
- JobID jid = new JobID();
- String stringKey = "my test key";
- CheckedThread[] threads = new CheckedThread[] {
- new NameAddressableGetStorageLocation(server, jid, stringKey),
- new NameAddressableGetStorageLocation(server, jid, stringKey),
- new NameAddressableGetStorageLocation(server, jid, stringKey)
- };
- checkedThreadSimpleTest(threads);
- } finally {
- server.close();
- }
- }
-
// --------------------------------------------------------------------------------------------
@Test
@@ -186,11 +148,6 @@ public class BlobServerPutTest extends TestLogger {
BlobKey key2 = client.put(data, 10, 44);
assertNotNull(key2);
- // put under job and name scope
- JobID jid = new JobID();
- String stringKey = "my test key";
- client.put(jid, stringKey, data);
-
// --- GET the data and check that it is equal ---
// one get request on the same client
@@ -212,12 +169,6 @@ public class BlobServerPutTest extends TestLogger {
BlobUtils.readFully(is2, result2, 0, result2.length, null);
is2.close();
assertArrayEquals(data, result2);
-
- InputStream is3 = client.get(jid, stringKey);
- byte[] result3 = new byte[data.length];
- BlobUtils.readFully(is3, result3, 0, result3.length, null);
- is3.close();
- assertArrayEquals(data, result3);
} finally {
if (client != null) {
client.close();
@@ -248,14 +199,6 @@ public class BlobServerPutTest extends TestLogger {
{
BlobKey key1 = client.put(new ByteArrayInputStream(data));
assertNotNull(key1);
-
- }
-
- // put under job and name scope
- {
- JobID jid = new JobID();
- String stringKey = "my test key";
- client.put(jid, stringKey, new ByteArrayInputStream(data));
}
} finally {
if (client != null) {
@@ -290,14 +233,6 @@ public class BlobServerPutTest extends TestLogger {
{
BlobKey key1 = client.put(new ChunkedInputStream(data, 19));
assertNotNull(key1);
-
- }
-
- // put under job and name scope
- {
- JobID jid = new JobID();
- String stringKey = "my test key";
- client.put(jid, stringKey, new ChunkedInputStream(data, 17));
}
} finally {
if (client != null) {
@@ -364,64 +299,6 @@ public class BlobServerPutTest extends TestLogger {
}
}
- @Test
- public void testPutNamedBufferFails() throws IOException {
- assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
-
- BlobServer server = null;
- BlobClient client = null;
-
- File tempFileDir = null;
- try {
- Configuration config = new Configuration();
- server = new BlobServer(config, new VoidBlobStore());
-
- // make sure the blob server cannot create any files in its storage dir
- tempFileDir = server.createTemporaryFilename().getParentFile().getParentFile();
- assertTrue(tempFileDir.setExecutable(true, false));
- assertTrue(tempFileDir.setReadable(true, false));
- assertTrue(tempFileDir.setWritable(false, false));
-
- InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
- client = new BlobClient(serverAddress, config);
-
- byte[] data = new byte[2000000];
- rnd.nextBytes(data);
-
- // put under job and name scope
- try {
- JobID jid = new JobID();
- String stringKey = "my test key";
- client.put(jid, stringKey, data);
- fail("This should fail.");
- }
- catch (IOException e) {
- assertTrue(e.getMessage(), e.getMessage().contains("Server side error"));
- }
-
- try {
- JobID jid = new JobID();
- String stringKey = "another key";
- client.put(jid, stringKey, data);
- fail("Client should be closed");
- }
- catch (IllegalStateException e) {
- // expected
- }
- } finally {
- // set writable again to make sure we can remove the directory
- if (tempFileDir != null) {
- tempFileDir.setWritable(true, false);
- }
- if (client != null) {
- client.close();
- }
- if (server != null) {
- server.close();
- }
- }
- }
-
/**
* FLINK-6020
*
http://git-wip-us.apache.org/repos/asf/flink/blob/f68856af/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 73f5819..a727294 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -67,6 +67,8 @@ public class BlobLibraryCacheManagerTest {
buf[0] += 1;
keys.add(bc.put(buf));
+ bc.close();
+
long cleanupInterval = 1000l;
libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval);
libraryCacheManager.registerJob(jid, keys, Collections.<URL>emptyList());
@@ -96,13 +98,17 @@ public class BlobLibraryCacheManagerTest {
assertEquals(0, checkFilesExist(keys, libraryCacheManager, false));
try {
- bc.get(jid, "test");
+ server.getURL(keys.get(0));
+ fail("name-addressable BLOB should have been deleted");
+ } catch (IOException e) {
+ // expected
+ }
+ try {
+ server.getURL(keys.get(1));
fail("name-addressable BLOB should have been deleted");
} catch (IOException e) {
// expected
}
-
- bc.close();
}
catch (Exception e) {
e.printStackTrace();
@@ -180,7 +186,6 @@ public class BlobLibraryCacheManagerTest {
keys.add(bc.put(buf));
buf[0] += 1;
keys.add(bc.put(buf));
- bc.put(jid, "test", buf);
long cleanupInterval = 1000l;
libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval);
[4/4] flink git commit: [FLINK-6965] Include snappy-java in flink-dist
Posted by ch...@apache.org.
[FLINK-6965] Include snappy-java in flink-dist
This closes #4160.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/021d27d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/021d27d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/021d27d5
Branch: refs/heads/master
Commit: 021d27d54960bedd58b00e89f7898a34f3991336
Parents: ed3b326
Author: zentol <ch...@apache.org>
Authored: Wed Jun 21 16:18:34 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 11 18:27:06 2017 +0200
----------------------------------------------------------------------
flink-core/pom.xml | 13 ++++++-------
pom.xml | 6 ++++++
tools/travis_mvn_watchdog.sh | 9 ++++++++-
3 files changed, 20 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/021d27d5/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 5b42220..c6ba680 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -79,13 +79,12 @@ under the License.
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
- <!-- managed version -->
- <exclusions>
- <exclusion>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- </exclusion>
- </exclusions>
+ </dependency>
+
+ <!-- We explicitly depend on snappy since connectors that require it load it through the system class loader -->
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
</dependency>
<!-- ASM is needed for type extraction -->
http://git-wip-us.apache.org/repos/asf/flink/blob/021d27d5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 091769f..3de92c7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -267,6 +267,12 @@ under the License.
<version>1.8.2</version>
</dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>1.1.1.3</version>
+ </dependency>
+
<!-- Make sure we use a consistent commons-cli version throughout the project -->
<dependency>
<groupId>commons-cli</groupId>
http://git-wip-us.apache.org/repos/asf/flink/blob/021d27d5/tools/travis_mvn_watchdog.sh
----------------------------------------------------------------------
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index f3c1699..b22bfe0 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -165,7 +165,7 @@ watchdog () {
done
}
-# Check the final fat jar for illegal artifacts
+# Check the final fat jar for illegal or missing artifacts
check_shaded_artifacts() {
jar tf build-target/lib/flink-dist*.jar > allClasses
ASM=`cat allClasses | grep '^org/objectweb/asm/' | wc -l`
@@ -184,6 +184,13 @@ check_shaded_artifacts() {
exit 1
fi
+ SNAPPY=`cat allClasses | grep '^org/xerial/snappy' | wc -l`
+ if [ $SNAPPY == "0" ]; then
+ echo "=============================================================================="
+ echo "Missing snappy dependencies in fat jar"
+ echo "=============================================================================="
+ exit 1
+ fi
}
# =============================================================================
[2/4] flink git commit: [FLINK-7103] [dispatcher] Add skeletal
structure of Dispatcher component
Posted by ch...@apache.org.
[FLINK-7103] [dispatcher] Add skeletal structure of Dispatcher component
The Dispatcher is responsible for receiving job submissions, persisting the JobGraphs,
spawning JobManager to execute the jobs and recovering the jobs in case of a master
failure. This commit adds the basic skeleton including the RPC call for job submission.
Add cleanup logic for finished jobs
Pass BlobService to JobManagerRunner
This closes #4260.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/748eba1b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/748eba1b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/748eba1b
Branch: refs/heads/master
Commit: 748eba1b5363aeb9e64d379b9b25d5d997bec7ad
Parents: f68856a
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jul 4 23:15:52 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 11 18:19:25 2017 +0200
----------------------------------------------------------------------
.../flink/runtime/dispatcher/Dispatcher.java | 294 +++++++++++++++++++
.../runtime/dispatcher/DispatcherGateway.java | 55 ++++
.../dispatcher/StandaloneDispatcher.java | 86 ++++++
.../runtime/jobmanager/SubmittedJobGraph.java | 6 +-
.../runtime/jobmaster/JobManagerRunner.java | 6 +-
.../runtime/jobmaster/JobManagerServices.java | 11 +-
.../flink/runtime/minicluster/MiniCluster.java | 17 ++
.../minicluster/MiniClusterJobDispatcher.java | 16 +-
.../runtime/dispatcher/DispatcherTest.java | 149 ++++++++++
.../jobmaster/JobManagerRunnerMockTest.java | 3 +-
.../yarn/YarnFlinkApplicationMasterRunner.java | 7 +
11 files changed, 639 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/748eba1b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
new file mode 100644
index 0000000..e0ec049
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobService;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Base class for the Dispatcher component. The Dispatcher component is responsible
+ * for receiving job submissions, persisting them, spawning JobManagers to execute
+ * the jobs and to recover them in case of a master failure. Furthermore, it knows
+ * about the state of the Flink session cluster.
+ */
+public abstract class Dispatcher extends RpcEndpoint<DispatcherGateway> {
+
+ public static final String DISPATCHER_NAME = "dispatcher";
+
+ private final Configuration configuration;
+
+ private final SubmittedJobGraphStore submittedJobGraphStore;
+ private final RunningJobsRegistry runningJobsRegistry;
+
+ private final HighAvailabilityServices highAvailabilityServices;
+ private final BlobServer blobServer;
+ private final HeartbeatServices heartbeatServices;
+ private final MetricRegistry metricRegistry;
+
+ private final FatalErrorHandler fatalErrorHandler;
+
+ private final Map<JobID, JobManagerRunner> jobManagerRunners;
+
+ protected Dispatcher(
+ RpcService rpcService,
+ String endpointId,
+ Configuration configuration,
+ HighAvailabilityServices highAvailabilityServices,
+ BlobServer blobServer,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry,
+ FatalErrorHandler fatalErrorHandler) throws Exception {
+ super(rpcService, endpointId);
+
+ this.configuration = Preconditions.checkNotNull(configuration);
+ this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
+ this.blobServer = Preconditions.checkNotNull(blobServer);
+ this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices);
+ this.metricRegistry = Preconditions.checkNotNull(metricRegistry);
+ this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
+
+ this.submittedJobGraphStore = highAvailabilityServices.getSubmittedJobGraphStore();
+ this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
+
+ jobManagerRunners = new HashMap<>(16);
+ }
+
+ //------------------------------------------------------
+ // Lifecycle methods
+ //------------------------------------------------------
+
+ @Override
+ public void shutDown() throws Exception {
+ Exception exception = null;
+ // stop all currently running JobManagerRunners
+ for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) {
+ jobManagerRunner.shutdown();
+ }
+
+ jobManagerRunners.clear();
+
+ try {
+ submittedJobGraphStore.stop();
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+
+ try {
+ super.shutDown();
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+
+ if (exception != null) {
+ throw new FlinkException("Could not properly terminate the Dispatcher.", exception);
+ }
+ }
+
+ //------------------------------------------------------
+ // RPCs
+ //------------------------------------------------------
+
+ @RpcMethod
+ public Acknowledge submitJob(JobGraph jobGraph) throws JobSubmissionException {
+ final JobID jobId = jobGraph.getJobID();
+
+ log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
+
+ final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus;
+
+ try {
+ jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
+ } catch (IOException e) {
+ log.warn("Cannot retrieve job status for {}.", jobId, e);
+ throw new JobSubmissionException(jobId, "Could not retrieve the job status.", e);
+ }
+
+ if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) {
+ try {
+ submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
+ } catch (Exception e) {
+ log.warn("Cannot persist JobGraph.", e);
+ throw new JobSubmissionException(jobId, "Could not persist JobGraph.", e);
+ }
+
+ final JobManagerRunner jobManagerRunner;
+
+ try {
+ jobManagerRunner = createJobManagerRunner(
+ ResourceID.generate(),
+ jobGraph,
+ configuration,
+ getRpcService(),
+ highAvailabilityServices,
+ blobServer,
+ heartbeatServices,
+ metricRegistry,
+ new DispatcherOnCompleteActions(jobGraph.getJobID()),
+ fatalErrorHandler);
+
+ jobManagerRunner.start();
+ } catch (Exception e) {
+ try {
+ // We should only remove a job from the submitted job graph store
+ // if the initial submission failed. Never in case of a recovery
+ submittedJobGraphStore.removeJobGraph(jobId);
+ } catch (Throwable t) {
+ log.warn("Cannot remove job graph from submitted job graph store.", t);
+ e.addSuppressed(t);
+ }
+
+ throw new JobSubmissionException(jobId, "Could not start JobManager.", e);
+ }
+
+ jobManagerRunners.put(jobId, jobManagerRunner);
+
+ return Acknowledge.get();
+ } else {
+ throw new JobSubmissionException(jobId, "Job has already been submitted and " +
+ "is currently in state " + jobSchedulingStatus + '.');
+ }
+ }
+
+ @RpcMethod
+ public Collection<JobID> listJobs() {
+ // TODO: return proper list of running jobs
+ return jobManagerRunners.keySet();
+ }
+
+ /**
+ * Cleans up the job related data from the dispatcher. If cleanupHA is true, then
+ * the data will also be removed from HA.
+ *
+ * @param jobId JobID identifying the job to clean up
+ * @param cleanupHA True iff HA data shall also be cleaned up
+ */
+ private void removeJob(JobID jobId, boolean cleanupHA) throws Exception {
+ JobManagerRunner jobManagerRunner = jobManagerRunners.remove(jobId);
+
+ if (jobManagerRunner != null) {
+ jobManagerRunner.shutdown();
+ }
+
+ if (cleanupHA) {
+ submittedJobGraphStore.removeJobGraph(jobId);
+ }
+
+ // TODO: remove job related files from blob server
+ }
+
+ protected abstract JobManagerRunner createJobManagerRunner(
+ ResourceID resourceId,
+ JobGraph jobGraph,
+ Configuration configuration,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ BlobService blobService,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry,
+ OnCompletionActions onCompleteActions,
+ FatalErrorHandler fatalErrorHandler) throws Exception;
+
+ //------------------------------------------------------
+ // Utility classes
+ //------------------------------------------------------
+
+ private class DispatcherOnCompleteActions implements OnCompletionActions {
+
+ private final JobID jobId;
+
+ private DispatcherOnCompleteActions(JobID jobId) {
+ this.jobId = Preconditions.checkNotNull(jobId);
+ }
+
+ @Override
+ public void jobFinished(JobExecutionResult result) {
+ log.info("Job {} finished.", jobId);
+
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ removeJob(jobId, true);
+ } catch (Exception e) {
+ log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void jobFailed(Throwable cause) {
+ log.info("Job {} failed.", jobId);
+
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ removeJob(jobId, true);
+ } catch (Exception e) {
+ log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void jobFinishedByOther() {
+ log.info("Job {} was finished by other JobManager.", jobId);
+
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ removeJob(jobId, false);
+ } catch (Exception e) {
+ log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
+ }
+ }
+ });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/748eba1b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
new file mode 100644
index 0000000..c730bc1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+
+import java.util.Collection;
+
+/**
+ * Gateway for the Dispatcher component.
+ */
+public interface DispatcherGateway extends RpcGateway {
+
+ /**
+ * Submit a job to the dispatcher.
+ *
+ * @param jobGraph JobGraph to submit
+ * @param timeout RPC timeout
+ * @return A future acknowledge if the submission succeeded
+ */
+ Future<Acknowledge> submitJob(
+ JobGraph jobGraph,
+ @RpcTimeout Time timeout);
+
+ /**
+ * Lists the current set of submitted jobs.
+ *
+ * @param timeout RPC timeout
+ * @return A future collection of currently submitted jobs
+ */
+ Future<Collection<JobID>> listJobs(
+ @RpcTimeout Time timeout);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/748eba1b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
new file mode 100644
index 0000000..6687657
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobService;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+/**
+ * Dispatcher implementation which spawns a {@link JobMaster} for each
+ * submitted {@link JobGraph} within in the same process. This dispatcher
+ * can be used as the default for all different session clusters.
+ */
+public class StandaloneDispatcher extends Dispatcher {
+ protected StandaloneDispatcher(
+ RpcService rpcService,
+ String endpointId,
+ Configuration configuration,
+ HighAvailabilityServices highAvailabilityServices,
+ BlobServer blobServer,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry,
+ FatalErrorHandler fatalErrorHandler) throws Exception {
+ super(
+ rpcService,
+ endpointId,
+ configuration,
+ highAvailabilityServices,
+ blobServer,
+ heartbeatServices,
+ metricRegistry,
+ fatalErrorHandler);
+ }
+
+ @Override
+ protected JobManagerRunner createJobManagerRunner(
+ ResourceID resourceId,
+ JobGraph jobGraph,
+ Configuration configuration,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ BlobService blobService,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry,
+ OnCompletionActions onCompleteActions,
+ FatalErrorHandler fatalErrorHandler) throws Exception {
+ // create the standard job manager runner
+ return new JobManagerRunner(
+ resourceId,
+ jobGraph,
+ configuration,
+ rpcService,
+ highAvailabilityServices,
+ blobService,
+ heartbeatServices,
+ metricRegistry,
+ onCompleteActions,
+ fatalErrorHandler);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/748eba1b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
index e868da7..979f3d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.jobmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import javax.annotation.Nullable;
+
import java.io.Serializable;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -44,9 +46,9 @@ public class SubmittedJobGraph implements Serializable {
* @param jobGraph The submitted {@link JobGraph}
* @param jobInfo The {@link JobInfo}
*/
- public SubmittedJobGraph(JobGraph jobGraph, JobInfo jobInfo) {
+ public SubmittedJobGraph(JobGraph jobGraph, @Nullable JobInfo jobInfo) {
this.jobGraph = checkNotNull(jobGraph, "Job graph");
- this.jobInfo = checkNotNull(jobInfo, "Job info");
+ this.jobInfo = jobInfo;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/748eba1b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index d7fae35..d557f6b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
@@ -92,6 +93,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
final Configuration configuration,
final RpcService rpcService,
final HighAvailabilityServices haServices,
+ final BlobService blobService,
final HeartbeatServices heartbeatServices,
final OnCompletionActions toNotifyOnComplete,
final FatalErrorHandler errorHandler) throws Exception {
@@ -101,6 +103,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
configuration,
rpcService,
haServices,
+ blobService,
heartbeatServices,
new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)),
toNotifyOnComplete,
@@ -113,6 +116,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
final Configuration configuration,
final RpcService rpcService,
final HighAvailabilityServices haServices,
+ final BlobService blobService,
final HeartbeatServices heartbeatServices,
final MetricRegistry metricRegistry,
final OnCompletionActions toNotifyOnComplete,
@@ -124,7 +128,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
rpcService,
haServices,
heartbeatServices,
- JobManagerServices.fromConfiguration(configuration, haServices),
+ JobManagerServices.fromConfiguration(configuration, blobService),
metricRegistry,
toNotifyOnComplete,
errorHandler);
http://git-wip-us.apache.org/repos/asf/flink/blob/748eba1b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
index ac4d06f..e14f5af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -23,13 +23,13 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
import scala.concurrent.duration.FiniteDuration;
@@ -103,15 +103,16 @@ public class JobManagerServices {
public static JobManagerServices fromConfiguration(
Configuration config,
- HighAvailabilityServices haServices) throws Exception {
+ BlobService blobService) throws Exception {
- final BlobServer blobServer = new BlobServer(config, haServices.createBlobStore());
+ Preconditions.checkNotNull(config);
+ Preconditions.checkNotNull(blobService);
final long cleanupInterval = config.getLong(
ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
- final BlobLibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval);
+ final BlobLibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(blobService, cleanupInterval);
final FiniteDuration timeout;
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/748eba1b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 6d3a3c4..8841f68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -86,6 +87,9 @@ public class MiniCluster {
private HighAvailabilityServices haServices;
@GuardedBy("lock")
+ private BlobServer blobServer;
+
+ @GuardedBy("lock")
private HeartbeatServices heartbeatServices;
@GuardedBy("lock")
@@ -241,6 +245,8 @@ public class MiniCluster {
configuration,
commonRpcService.getExecutor());
+ blobServer = new BlobServer(configuration, haServices.createBlobStore());
+
heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
// bring up the ResourceManager(s)
@@ -263,6 +269,7 @@ public class MiniCluster {
jobDispatcher = new MiniClusterJobDispatcher(
configuration,
haServices,
+ blobServer,
heartbeatServices,
metricRegistry,
numJobManagers,
@@ -363,6 +370,16 @@ public class MiniCluster {
taskManagerRpcServices = null;
resourceManagerRpcServices = null;
+ // shut down the blob server
+ if (blobServer != null) {
+ try {
+ blobServer.close();
+ } catch (Exception e) {
+ exception = firstOrSuppressed(e, exception);
+ }
+ blobServer = null;
+ }
+
// shut down high-availability services
if (haServices != null) {
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/748eba1b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index 1f8ae80..2bb94f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.minicluster;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -99,9 +100,17 @@ public class MiniClusterJobDispatcher {
Configuration config,
RpcService rpcService,
HighAvailabilityServices haServices,
+ BlobServer blobServer,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry) throws Exception {
- this(config, haServices, heartbeatServices, metricRegistry, 1, new RpcService[] { rpcService });
+ this(
+ config,
+ haServices,
+ blobServer,
+ heartbeatServices,
+ metricRegistry,
+ 1,
+ new RpcService[] { rpcService });
}
/**
@@ -119,6 +128,7 @@ public class MiniClusterJobDispatcher {
public MiniClusterJobDispatcher(
Configuration config,
HighAvailabilityServices haServices,
+ BlobServer blobServer,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
int numJobManagers,
@@ -135,7 +145,7 @@ public class MiniClusterJobDispatcher {
this.numJobManagers = numJobManagers;
LOG.info("Creating JobMaster services");
- this.jobManagerServices = JobManagerServices.fromConfiguration(config, haServices);
+ this.jobManagerServices = JobManagerServices.fromConfiguration(config, blobServer);
}
// ------------------------------------------------------------------------
@@ -285,6 +295,8 @@ public class MiniClusterJobDispatcher {
// to remove the data after job finished
try {
haServices.getRunningJobsRegistry().clearJob(jobID);
+
+ // TODO: Remove job data from BlobServer
}
catch (Throwable t) {
LOG.warn("Could not clear job {} at the status registry of the high-availability services", jobID, t);
http://git-wip-us.apache.org/repos/asf/flink/blob/748eba1b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
new file mode 100644
index 0000000..a7a86c3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobService;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for the {@link Dispatcher} component.
+ */
+public class DispatcherTest extends TestLogger {
+
+ /**
+ * Tests that we can submit a job to the Dispatcher which then spawns a
+ * new JobManagerRunner.
+ */
+ @Test
+ public void testJobSubmission() throws Exception {
+ TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
+ RpcService rpcService = new TestingRpcService();
+ HighAvailabilityServices haServices = new StandaloneHaServices("localhost", "localhost");
+ HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L);
+ JobManagerRunner jobManagerRunner = mock(JobManagerRunner.class);
+
+ final Time timeout = Time.seconds(5L);
+ final JobGraph jobGraph = mock(JobGraph.class);
+ final JobID jobId = new JobID();
+ when(jobGraph.getJobID()).thenReturn(jobId);
+
+ try {
+ final TestingDispatcher dispatcher = new TestingDispatcher(
+ rpcService,
+ Dispatcher.DISPATCHER_NAME,
+ new Configuration(),
+ haServices,
+ mock(BlobServer.class),
+ heartbeatServices,
+ mock(MetricRegistry.class),
+ fatalErrorHandler,
+ jobManagerRunner,
+ jobId);
+
+ dispatcher.start();
+
+ DispatcherGateway dispatcherGateway = dispatcher.getSelf();
+
+ Future<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout);
+
+ acknowledgeFuture.get();
+
+ verify(jobManagerRunner, Mockito.timeout(timeout.toMilliseconds())).start();
+
+ // check that no error has occurred
+ fatalErrorHandler.rethrowError();
+ } finally {
+ rpcService.stopService();
+ }
+ }
+
+ private static class TestingDispatcher extends Dispatcher {
+
+ private final JobManagerRunner jobManagerRunner;
+ private final JobID expectedJobId;
+
+ protected TestingDispatcher(
+ RpcService rpcService,
+ String endpointId,
+ Configuration configuration,
+ HighAvailabilityServices highAvailabilityServices,
+ BlobServer blobServer,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry,
+ FatalErrorHandler fatalErrorHandler,
+ JobManagerRunner jobManagerRunner,
+ JobID expectedJobId) throws Exception {
+ super(
+ rpcService,
+ endpointId,
+ configuration,
+ highAvailabilityServices,
+ blobServer,
+ heartbeatServices,
+ metricRegistry,
+ fatalErrorHandler);
+
+ this.jobManagerRunner = jobManagerRunner;
+ this.expectedJobId = expectedJobId;
+ }
+
+ @Override
+ protected JobManagerRunner createJobManagerRunner(
+ ResourceID resourceId,
+ JobGraph jobGraph,
+ Configuration configuration,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ BlobService blobService,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry,
+ OnCompletionActions onCompleteActions,
+ FatalErrorHandler fatalErrorHandler) throws Exception {
+ assertEquals(expectedJobId, jobGraph.getJobID());
+
+ return jobManagerRunner;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/748eba1b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index 753c797..7c58879 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -111,7 +112,7 @@ public class JobManagerRunnerMockTest extends TestLogger {
mockRpc,
haServices,
heartbeatServices,
- JobManagerServices.fromConfiguration(new Configuration(), haServices),
+ JobManagerServices.fromConfiguration(new Configuration(), mock(BlobServer.class)),
new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()),
jobCompletion,
jobCompletion));
http://git-wip-us.apache.org/repos/asf/flink/blob/748eba1b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index 2ad9065..0107f80 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
@@ -99,6 +100,9 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
private RpcService commonRpcService;
@GuardedBy("lock")
+ private BlobServer blobServer;
+
+ @GuardedBy("lock")
private ResourceManager resourceManager;
@GuardedBy("lock")
@@ -144,6 +148,8 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
commonRpcService.getExecutor(),
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+ blobServer = new BlobServer(config, haServices.createBlobStore());
+
heartbeatServices = HeartbeatServices.fromConfiguration(config);
metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
@@ -228,6 +234,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
config,
commonRpcService,
haServices,
+ blobServer,
heartbeatServices,
this,
this);