You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/05 14:07:07 UTC
[14/14] flink git commit: [FLINK-7068][blob] Introduce permanent and
transient BLOB keys
[FLINK-7068][blob] Introduce permanent and transient BLOB keys
[FLINK-7068][blob] address PR review comments, part 1
[FLINK-7068][blob] create a common base class for the BLOB caches
[FLINK-7068][blob] update some comments
[FLINK-7068][blob] integrate the BLOB type into the BlobKey
[FLINK-7068][blob] rename a few methods for better consistency
[FLINK-7068][blob] fix Blob*DeleteTest not working as documented in one test
[FLINK-7068][blob] add checks for jobId being null in PermanentBlobCache
[FLINK-7068][blob] implement get-and-delete logic for transient BLOBs
Transient BLOB files are deleted on the BlobServer upon first access from a
cache. Therefore, we do not need the DELETE operations anymore, aside from
deleting the file from the local cache (for now).
[FLINK-7068][blob] address PR comments, part 2
[FLINK-7068][blob] separate permanent and transient BLOB keys
* create PermanentBlobKey and TransientBlobKey (inheriting from BlobKey) and
forbid using transient BLOBs with permanent caches and vice versa
* make BlobKey package-private, similarly for the BlobType which is now
reflected by the two BlobKey sub-classes
-> this gives a cleaner interface for the user
This closes #4358.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84a07a34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84a07a34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84a07a34
Branch: refs/heads/master
Commit: 84a07a34ac22af14f2dd0319447ca5f45de6d0bb
Parents: b57330d
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Sep 20 12:05:25 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 5 16:06:27 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/hdfstests/HDFSTest.java | 44 +--
.../flink/runtime/blob/AbstractBlobCache.java | 241 +++++++++++++
.../apache/flink/runtime/blob/BlobCache.java | 80 -----
.../flink/runtime/blob/BlobCacheService.java | 96 +++++
.../apache/flink/runtime/blob/BlobClient.java | 190 +++-------
.../flink/runtime/blob/BlobInputStream.java | 31 +-
.../org/apache/flink/runtime/blob/BlobKey.java | 153 ++++++--
.../apache/flink/runtime/blob/BlobServer.java | 96 +++--
.../runtime/blob/BlobServerConnection.java | 214 +++++-------
.../flink/runtime/blob/BlobServerProtocol.java | 35 +-
.../apache/flink/runtime/blob/BlobService.java | 4 +-
.../apache/flink/runtime/blob/BlobUtils.java | 2 +-
.../flink/runtime/blob/FileSystemBlobStore.java | 5 +-
.../flink/runtime/blob/PermanentBlobCache.java | 289 ++++-----------
.../flink/runtime/blob/PermanentBlobKey.java | 45 +++
.../runtime/blob/PermanentBlobService.java | 8 +-
.../flink/runtime/blob/TransientBlobCache.java | 203 ++---------
.../flink/runtime/blob/TransientBlobKey.java | 45 +++
.../runtime/blob/TransientBlobService.java | 41 ++-
.../apache/flink/runtime/client/JobClient.java | 14 +-
.../librarycache/BlobLibraryCacheManager.java | 18 +-
.../FallbackLibraryCacheManager.java | 6 +-
.../librarycache/LibraryCacheManager.java | 6 +-
.../runtime/executiongraph/ExecutionGraph.java | 10 +-
.../runtime/executiongraph/JobInformation.java | 8 +-
.../apache/flink/runtime/jobgraph/JobGraph.java | 13 +-
.../slots/ActorTaskManagerGateway.java | 12 +-
.../jobmanager/slots/TaskManagerGateway.java | 6 +-
.../jobmaster/RpcTaskManagerGateway.java | 6 +-
.../jobmaster/message/ClassloadingProps.java | 8 +-
.../handler/legacy/TaskManagerLogHandler.java | 14 +-
.../taskexecutor/JobManagerConnection.java | 12 +-
.../runtime/taskexecutor/TaskExecutor.java | 16 +-
.../apache/flink/runtime/taskmanager/Task.java | 16 +-
.../flink/runtime/jobmanager/JobManager.scala | 2 +-
.../runtime/messages/JobManagerMessages.scala | 4 +-
.../flink/runtime/taskmanager/TaskManager.scala | 10 +-
.../runtime/blob/BlobCacheCleanupTest.java | 104 +++---
.../runtime/blob/BlobCacheCorruptionTest.java | 67 ++--
.../flink/runtime/blob/BlobCacheDeleteTest.java | 176 ++++------
.../flink/runtime/blob/BlobCacheGetTest.java | 306 +++++++++++-----
.../flink/runtime/blob/BlobCachePutTest.java | 350 ++++++++++++-------
.../runtime/blob/BlobCacheRecoveryTest.java | 36 +-
.../runtime/blob/BlobCacheRetriesTest.java | 35 +-
.../runtime/blob/BlobCacheSuccessTest.java | 36 +-
.../flink/runtime/blob/BlobClientTest.java | 117 ++++---
.../apache/flink/runtime/blob/BlobKeyTest.java | 128 ++++++-
.../runtime/blob/BlobServerCorruptionTest.java | 7 +-
.../runtime/blob/BlobServerDeleteTest.java | 81 +++--
.../flink/runtime/blob/BlobServerGetTest.java | 101 +++---
.../flink/runtime/blob/BlobServerPutTest.java | 248 +++++++------
.../runtime/blob/BlobServerRecoveryTest.java | 27 +-
.../flink/runtime/blob/BlobUtilsTest.java | 12 +-
...ExecutionGraphCheckpointCoordinatorTest.java | 6 +-
.../TaskDeploymentDescriptorTest.java | 6 +-
.../BlobLibraryCacheManagerTest.java | 40 +--
.../BlobLibraryCacheRecoveryITCase.java | 17 +-
.../executiongraph/FailoverRegionTest.java | 18 +-
.../executiongraph/GlobalModVersionTest.java | 6 +-
.../IndividualRestartsConcurrencyTest.java | 6 +-
.../PipelinedRegionFailoverConcurrencyTest.java | 6 +-
.../RestartPipelinedRegionStrategyTest.java | 18 +-
.../utils/SimpleAckingTaskManagerGateway.java | 6 +-
.../jobmanager/JobManagerCleanupITCase.java | 8 +-
.../flink/runtime/jobmanager/JobSubmitTest.java | 4 +-
.../legacy/TaskManagerLogHandlerTest.java | 4 +-
.../runtime/taskexecutor/TaskExecutorTest.java | 32 +-
.../runtime/taskmanager/TaskAsyncCallTest.java | 18 +-
.../runtime/taskmanager/TaskManagerTest.java | 54 +--
.../flink/runtime/taskmanager/TaskStopTest.java | 12 +-
.../flink/runtime/taskmanager/TaskTest.java | 66 ++--
.../runtime/util/JvmExitOnFatalErrorTest.java | 16 +-
.../partitioner/RescalePartitionerTest.java | 6 +-
.../runtime/tasks/BlockingCheckpointsTest.java | 18 +-
.../tasks/InterruptSensitiveRestoreTest.java | 18 +-
.../tasks/StreamTaskTerminationTest.java | 18 +-
.../streaming/runtime/tasks/StreamTaskTest.java | 18 +-
77 files changed, 2298 insertions(+), 1957 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
index 92f8413..e4b907a 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.io.AvroOutputFormat;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
@@ -255,21 +254,16 @@ public class HDFSTest {
org.apache.flink.configuration.Configuration
config = new org.apache.flink.configuration.Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
- config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);
- BlobStoreService blobStoreService = null;
+ BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
try {
- blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
-
BlobServerRecoveryTest.testBlobServerRecovery(config, blobStoreService);
} finally {
- if (blobStoreService != null) {
- blobStoreService.closeAndCleanupAllData();
- }
+ blobStoreService.closeAndCleanupAllData();
}
}
@@ -282,75 +276,61 @@ public class HDFSTest {
org.apache.flink.configuration.Configuration
config = new org.apache.flink.configuration.Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
- config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);
- BlobStoreService blobStoreService = null;
+ BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
try {
- blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
-
BlobServerCorruptionTest.testGetFailsFromCorruptFile(config, blobStoreService, exception);
} finally {
- if (blobStoreService != null) {
- blobStoreService.closeAndCleanupAllData();
- }
+ blobStoreService.closeAndCleanupAllData();
}
}
/**
* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any
- * participating BlobServer when uploaded via a {@link org.apache.flink.runtime.blob.BlobCache}.
+ * participating BlobServer when uploaded via a BLOB cache.
*/
@Test
public void testBlobCacheRecovery() throws Exception {
org.apache.flink.configuration.Configuration
config = new org.apache.flink.configuration.Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
- config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);
- BlobStoreService blobStoreService = null;
+ BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
try {
- blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
-
BlobCacheRecoveryTest.testBlobCacheRecovery(config, blobStoreService);
} finally {
- if (blobStoreService != null) {
- blobStoreService.closeAndCleanupAllData();
- }
+ blobStoreService.closeAndCleanupAllData();
}
}
/**
* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed corrupted JARs are
- * recognised during the download via a {@link org.apache.flink.runtime.blob.BlobCache}.
+ * recognised during the download via a BLOB cache.
*/
@Test
public void testBlobCacheCorruptedFile() throws Exception {
org.apache.flink.configuration.Configuration
config = new org.apache.flink.configuration.Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
- config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);
- BlobStoreService blobStoreService = null;
+ BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
try {
- blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
-
- BlobCacheCorruptionTest.testGetFailsFromCorruptFile(new JobID(), true, true, config, blobStoreService, exception);
+ BlobCacheCorruptionTest
+ .testGetFailsFromCorruptFile(new JobID(), config, blobStoreService, exception);
} finally {
- if (blobStoreService != null) {
- blobStoreService.closeAndCleanupAllData();
- }
+ blobStoreService.closeAndCleanupAllData();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
new file mode 100644
index 0000000..dc031e0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Abstract base class for permanent and transient BLOB files.
+ */
+public abstract class AbstractBlobCache implements Closeable {
+
+ /**
+ * The log object used for debugging.
+ */
+ protected final Logger log;
+
+ /**
+ * Counter to generate unique names for temporary files.
+ */
+ protected final AtomicLong tempFileCounter = new AtomicLong(0);
+
+ protected final InetSocketAddress serverAddress;
+
+ /**
+ * Root directory for local file storage.
+ */
+ protected final File storageDir;
+
+ /**
+ * Blob store for distributed file storage, e.g. in HA.
+ */
+ protected final BlobView blobView;
+
+ protected final AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+ /**
+ * Shutdown hook thread to ensure deletion of the local storage directory.
+ */
+ protected final Thread shutdownHook;
+
+ /**
+ * The number of retries when the transfer fails.
+ */
+ protected final int numFetchRetries;
+
+ /**
+ * Configuration for the blob client like ssl parameters required to connect to the blob
+ * server.
+ */
+ protected final Configuration blobClientConfig;
+
+ /**
+ * Lock guarding concurrent file accesses.
+ */
+ protected final ReadWriteLock readWriteLock;
+
+ public AbstractBlobCache(
+ final InetSocketAddress serverAddress,
+ final Configuration blobClientConfig,
+ final BlobView blobView,
+ final Logger logger) throws IOException {
+
+ this.log = checkNotNull(logger);
+
+ this.serverAddress = checkNotNull(serverAddress);
+ this.blobClientConfig = checkNotNull(blobClientConfig);
+ this.blobView = checkNotNull(blobView);
+ this.readWriteLock = new ReentrantReadWriteLock();
+
+ // configure and create the storage directory
+ String storageDirectory = blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY);
+ this.storageDir = BlobUtils.initLocalStorageDirectory(storageDirectory);
+ log.info("Created BLOB cache storage directory " + storageDir);
+
+ // configure the number of fetch retries
+ final int fetchRetries = blobClientConfig.getInteger(BlobServerOptions.FETCH_RETRIES);
+ if (fetchRetries >= 0) {
+ this.numFetchRetries = fetchRetries;
+ } else {
+ log.warn("Invalid value for {}. System will attempt no retries on failed fetch operations of BLOBs.",
+ BlobServerOptions.FETCH_RETRIES.key());
+ this.numFetchRetries = 0;
+ }
+
+ // Add shutdown hook to delete storage directory
+ shutdownHook = BlobUtils.addShutdownHook(this, log);
+ }
+
+ /**
+ * Returns local copy of the file for the BLOB with the given key.
+ *
+ * <p>The method will first attempt to serve the BLOB from its local cache. If the BLOB is not
+ * in the cache, the method will try to download it from this cache's BLOB server via a
+ * distributed BLOB store (if available) or direct end-to-end download.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
+ * @param blobKey
+ * The key of the desired BLOB.
+ *
+ * @return file referring to the local storage location of the BLOB.
+ *
+ * @throws IOException
+ * Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server.
+ */
+ protected File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOException {
+ checkArgument(blobKey != null, "BLOB key cannot be null.");
+
+ final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);
+ readWriteLock.readLock().lock();
+
+ try {
+ if (localFile.exists()) {
+ return localFile;
+ }
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+
+ // first try the distributed blob store (if available)
+ // use a temporary file (thread-safe without locking)
+ File incomingFile = createTemporaryFilename();
+ try {
+ try {
+ if (blobView.get(jobId, blobKey, incomingFile)) {
+ // now move the temp file to our local cache atomically
+ BlobUtils.moveTempFileToStore(
+ incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), log, null);
+
+ return localFile;
+ }
+ } catch (Exception e) {
+ log.info("Failed to copy from blob store. Downloading from BLOB server instead.", e);
+ }
+
+ // fallback: download from the BlobServer
+ BlobClient.downloadFromBlobServer(
+ jobId, blobKey, incomingFile, serverAddress, blobClientConfig, numFetchRetries);
+
+ BlobUtils.moveTempFileToStore(
+ incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), log, null);
+
+ return localFile;
+ } finally {
+ // delete incomingFile from a failed download
+ if (!incomingFile.delete() && incomingFile.exists()) {
+ log.warn("Could not delete the staging file {} for blob key {} and job {}.",
+ incomingFile, blobKey, jobId);
+ }
+ }
+ }
+
+ /**
+ * Returns the port the BLOB server is listening on.
+ *
+ * @return BLOB server port
+ */
+ public int getPort() {
+ return serverAddress.getPort();
+ }
+
+ /**
+ * Returns a temporary file inside the BLOB server's incoming directory.
+ *
+ * @return a temporary file inside the BLOB server's incoming directory
+ *
+ * @throws IOException
+ * if creating the directory fails
+ */
+ File createTemporaryFilename() throws IOException {
+ return new File(BlobUtils.getIncomingDirectory(storageDir),
+ String.format("temp-%08d", tempFileCounter.getAndIncrement()));
+ }
+
+ @Override
+ public void close() throws IOException {
+ cancelCleanupTask();
+
+ if (shutdownRequested.compareAndSet(false, true)) {
+ log.info("Shutting down BLOB cache");
+
+ // Clean up the storage directory
+ try {
+ FileUtils.deleteDirectory(storageDir);
+ } finally {
+ // Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself
+ if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
+ try {
+ Runtime.getRuntime().removeShutdownHook(shutdownHook);
+ } catch (IllegalStateException e) {
+ // race, JVM is in shutdown already, we can safely ignore this
+ } catch (Throwable t) {
+ log.warn("Exception while unregistering BLOB cache's cleanup shutdown hook.");
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Cancels any cleanup task that subclasses may be executing.
+ *
+ * <p>This is called during {@link #close()}.
+ */
+ protected abstract void cancelCleanupTask();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
deleted file mode 100644
index 2a20015..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.blob;
-
-import org.apache.flink.configuration.Configuration;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-/**
- * The BLOB cache provides access to BLOB services for permanent and transient BLOBs.
- */
-public class BlobCache implements BlobService {
-
- /** Caching store for permanent BLOBs. */
- private final PermanentBlobCache permanentBlobStore;
-
- /** Store for transient BLOB files. */
- private final TransientBlobCache transientBlobStore;
-
- /**
- * Instantiates a new BLOB cache.
- *
- * @param serverAddress
- * address of the {@link BlobServer} to use for fetching files from
- * @param blobClientConfig
- * global configuration
- * @param blobView
- * (distributed) blob store file system to retrieve files from first
- *
- * @throws IOException
- * thrown if the (local or distributed) file storage cannot be created or is not usable
- */
- public BlobCache(
- final InetSocketAddress serverAddress,
- final Configuration blobClientConfig,
- final BlobView blobView) throws IOException {
-
- this.permanentBlobStore = new PermanentBlobCache(serverAddress, blobClientConfig, blobView);
- this.transientBlobStore = new TransientBlobCache(serverAddress, blobClientConfig);
- }
-
- @Override
- public PermanentBlobCache getPermanentBlobStore() {
- return permanentBlobStore;
- }
-
- @Override
- public TransientBlobCache getTransientBlobStore() {
- return transientBlobStore;
- }
-
- @Override
- public void close() throws IOException {
- permanentBlobStore.close();
- transientBlobStore.close();
- }
-
- @Override
- public int getPort() {
- // NOTE: both blob stores connect to the same server!
- return permanentBlobStore.getPort();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheService.java
new file mode 100644
index 0000000..89ce2c4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheService.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The BLOB cache provides access to BLOB services for permanent and transient BLOBs.
+ */
+public class BlobCacheService implements BlobService {
+
+ /** Caching store for permanent BLOBs. */
+ private final PermanentBlobCache permanentBlobCache;
+
+ /** Store for transient BLOB files. */
+ private final TransientBlobCache transientBlobCache;
+
+ /**
+ * Instantiates a new BLOB cache.
+ *
+ * @param serverAddress
+ * address of the {@link BlobServer} to use for fetching files from
+ * @param blobClientConfig
+ * global configuration
+ * @param blobView
+ * (distributed) blob store file system to retrieve files from first
+ *
+ * @throws IOException
+ * thrown if the (local or distributed) file storage cannot be created or is not usable
+ */
+ public BlobCacheService(
+ final InetSocketAddress serverAddress,
+ final Configuration blobClientConfig,
+ final BlobView blobView) throws IOException {
+
+ this(new PermanentBlobCache(serverAddress, blobClientConfig, blobView),
+ new TransientBlobCache(serverAddress, blobClientConfig));
+ }
+
+ /**
+ * Instantiates a new BLOB cache.
+ *
+ * @param permanentBlobCache
+ * BLOB cache to use for permanent BLOBs
+ * @param transientBlobCache
+ * BLOB cache to use for transient BLOBs
+ */
+ public BlobCacheService(
+ PermanentBlobCache permanentBlobCache, TransientBlobCache transientBlobCache) {
+ this.permanentBlobCache = checkNotNull(permanentBlobCache);
+ this.transientBlobCache = checkNotNull(transientBlobCache);
+ }
+
+ @Override
+ public PermanentBlobCache getPermanentBlobService() {
+ return permanentBlobCache;
+ }
+
+ @Override
+ public TransientBlobCache getTransientBlobService() {
+ return transientBlobCache;
+ }
+
+ @Override
+ public void close() throws IOException {
+ permanentBlobCache.close();
+ transientBlobCache.close();
+ }
+
+ @Override
+ public int getPort() {
+ // NOTE: both blob stores connect to the same server!
+ return permanentBlobCache.getPort();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index a8ae471..3154f69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.util.InstantiationUtil;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +34,7 @@ import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocket;
+
import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
@@ -49,14 +51,13 @@ import java.util.Collections;
import java.util.List;
import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_FOR_JOB;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_NO_JOB;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_FOR_JOB_HA;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_RELATED_CONTENT;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_UNRELATED_CONTENT;
import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
import static org.apache.flink.runtime.blob.BlobUtils.readFully;
import static org.apache.flink.runtime.blob.BlobUtils.readLength;
import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
@@ -76,7 +77,7 @@ public final class BlobClient implements Closeable {
/**
* Instantiates a new BLOB client.
- *
+ *
* @param serverAddress
* the network address of the BLOB server
* @param clientConfig
@@ -126,12 +127,13 @@ public final class BlobClient implements Closeable {
/**
* Downloads the given BLOB from the given server and stores its contents to a (local) file.
*
+ * <p>Transient BLOB files are deleted after a successful copy of the server's data into the
+ * given <tt>localJarFile</tt>.
+ *
* @param jobId
* job ID the BLOB belongs to or <tt>null</tt> if job-unrelated
* @param blobKey
* BLOB key
- * @param permanentBlob
- * whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
* @param localJarFile
* the local file to write to
* @param serverAddress
@@ -145,9 +147,12 @@ public final class BlobClient implements Closeable {
* if an I/O error occurs during the download
*/
static void downloadFromBlobServer(
- @Nullable JobID jobId, BlobKey blobKey, boolean permanentBlob, File localJarFile,
- InetSocketAddress serverAddress, Configuration blobClientConfig, int numFetchRetries)
- throws IOException {
+ @Nullable JobID jobId,
+ BlobKey blobKey,
+ File localJarFile,
+ InetSocketAddress serverAddress,
+ Configuration blobClientConfig,
+ int numFetchRetries) throws IOException {
final byte[] buf = new byte[BUFFER_SIZE];
LOG.info("Downloading {}/{} from {}", jobId, blobKey, serverAddress);
@@ -157,7 +162,7 @@ public final class BlobClient implements Closeable {
while (true) {
try (
final BlobClient bc = new BlobClient(serverAddress, blobClientConfig);
- final InputStream is = bc.getInternal(jobId, blobKey, permanentBlob);
+ final InputStream is = bc.getInternal(jobId, blobKey);
final OutputStream os = new FileOutputStream(localJarFile)
) {
while (true) {
@@ -168,7 +173,6 @@ public final class BlobClient implements Closeable {
os.write(buf, 0, read);
}
- // success, we finished
return;
}
catch (Throwable t) {
@@ -176,7 +180,7 @@ public final class BlobClient implements Closeable {
" and store it under " + localJarFile.getAbsolutePath();
if (attempt < numFetchRetries) {
if (LOG.isDebugEnabled()) {
- LOG.debug(message + " Retrying...", t);
+ LOG.error(message + " Retrying...", t);
} else {
LOG.error(message + " Retrying...");
}
@@ -213,8 +217,6 @@ public final class BlobClient implements Closeable {
* ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
* @param blobKey
* blob key associated with the requested file
- * @param permanentBlob
- * whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
*
* @return an input stream to read the retrieved data from
*
@@ -223,7 +225,7 @@ public final class BlobClient implements Closeable {
* @throws IOException
* if an I/O error occurs during the download
*/
- InputStream getInternal(@Nullable JobID jobId, BlobKey blobKey, boolean permanentBlob)
+ InputStream getInternal(@Nullable JobID jobId, BlobKey blobKey)
throws IOException {
if (this.socket.isClosed()) {
@@ -240,10 +242,10 @@ public final class BlobClient implements Closeable {
InputStream is = this.socket.getInputStream();
// Send GET header
- sendGetHeader(os, jobId, blobKey, permanentBlob);
+ sendGetHeader(os, jobId, blobKey);
receiveAndCheckGetResponse(is);
- return new BlobInputStream(is, blobKey);
+ return new BlobInputStream(is, blobKey, os);
}
catch (Throwable t) {
BlobUtils.closeSilently(socket, LOG);
@@ -260,36 +262,32 @@ public final class BlobClient implements Closeable {
* ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
* @param blobKey
* blob key associated with the requested file
- * @param permanentBlob
- * whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
*
* @throws IOException
* thrown if an I/O error occurs while writing the header data to the output stream
*/
private static void sendGetHeader(
- OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey, boolean permanentBlob)
+ OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey)
throws IOException {
checkNotNull(blobKey);
- checkArgument(jobId != null || !permanentBlob, "permanent BLOBs must be job-related");
+ checkArgument(jobId != null || blobKey instanceof TransientBlobKey,
+ "permanent BLOBs must be job-related");
// Signal type of operation
outputStream.write(GET_OPERATION);
// Send job ID and key
if (jobId == null) {
- outputStream.write(CONTENT_NO_JOB);
- } else if (permanentBlob) {
- outputStream.write(CONTENT_FOR_JOB_HA);
- outputStream.write(jobId.getBytes());
+ outputStream.write(JOB_UNRELATED_CONTENT);
} else {
- outputStream.write(CONTENT_FOR_JOB);
+ outputStream.write(JOB_RELATED_CONTENT);
outputStream.write(jobId.getBytes());
}
blobKey.writeToOutputStream(outputStream);
}
/**
- * Reads the response from the input stream and throws in case of errors
+ * Reads the response from the input stream and throws in case of errors.
*
* @param is
* stream to read from
@@ -327,8 +325,8 @@ public final class BlobClient implements Closeable {
* the read offset within the buffer
* @param len
* the number of bytes to read from the buffer
- * @param permanentBlob
- * whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*
* @return the computed BLOB key of the uploaded BLOB
*
@@ -336,7 +334,7 @@ public final class BlobClient implements Closeable {
* thrown if an I/O error occurs while uploading the data to the BLOB server
*/
BlobKey putBuffer(
- @Nullable JobID jobId, byte[] value, int offset, int len, boolean permanentBlob)
+ @Nullable JobID jobId, byte[] value, int offset, int len, BlobKey.BlobType blobType)
throws IOException {
if (this.socket.isClosed()) {
@@ -354,7 +352,7 @@ public final class BlobClient implements Closeable {
final MessageDigest md = BlobUtils.createMessageDigest();
// Send the PUT header
- sendPutHeader(os, jobId, permanentBlob);
+ sendPutHeader(os, jobId, blobType);
// Send the value in iterations of BUFFER_SIZE
int remainingBytes = len;
@@ -378,7 +376,7 @@ public final class BlobClient implements Closeable {
// Receive blob key and compare
final InputStream is = this.socket.getInputStream();
- return receiveAndCheckPutResponse(is, md);
+ return receiveAndCheckPutResponse(is, md, blobType);
}
catch (Throwable t) {
BlobUtils.closeSilently(socket, LOG);
@@ -393,15 +391,15 @@ public final class BlobClient implements Closeable {
* the ID of the job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
* @param inputStream
* the input stream to read the data from
- * @param permanentBlob
- * whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*
* @return the computed BLOB key of the uploaded BLOB
*
* @throws IOException
* thrown if an I/O error occurs while uploading the data to the BLOB server
*/
- BlobKey putInputStream(@Nullable JobID jobId, InputStream inputStream, boolean permanentBlob)
+ BlobKey putInputStream(@Nullable JobID jobId, InputStream inputStream, BlobKey.BlobType blobType)
throws IOException {
if (this.socket.isClosed()) {
@@ -420,7 +418,7 @@ public final class BlobClient implements Closeable {
final byte[] xferBuf = new byte[BUFFER_SIZE];
// Send the PUT header
- sendPutHeader(os, jobId, permanentBlob);
+ sendPutHeader(os, jobId, blobType);
while (true) {
// since we don't know a total size here, send lengths iteratively
@@ -439,7 +437,7 @@ public final class BlobClient implements Closeable {
// Receive blob key and compare
final InputStream is = this.socket.getInputStream();
- return receiveAndCheckPutResponse(is, md);
+ return receiveAndCheckPutResponse(is, md, blobType);
}
catch (Throwable t) {
BlobUtils.closeSilently(socket, LOG);
@@ -454,41 +452,42 @@ public final class BlobClient implements Closeable {
* the output stream to write the PUT header data to
* @param jobId
* the ID of job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
- * @param permanentBlob
- * whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*
* @throws IOException
* thrown if an I/O error occurs while writing the header data to the output stream
*/
private static void sendPutHeader(
- OutputStream outputStream, @Nullable JobID jobId, boolean permanentBlob)
+ OutputStream outputStream, @Nullable JobID jobId, BlobKey.BlobType blobType)
throws IOException {
// Signal type of operation
outputStream.write(PUT_OPERATION);
if (jobId == null) {
- outputStream.write(CONTENT_NO_JOB);
- } else if (permanentBlob) {
- outputStream.write(CONTENT_FOR_JOB_HA);
- outputStream.write(jobId.getBytes());
+ outputStream.write(JOB_UNRELATED_CONTENT);
} else {
- outputStream.write(CONTENT_FOR_JOB);
+ outputStream.write(JOB_RELATED_CONTENT);
outputStream.write(jobId.getBytes());
}
+ outputStream.write(blobType.ordinal());
}
/**
- * Reads the response from the input stream and throws in case of errors
+ * Reads the response from the input stream and throws in case of errors.
*
* @param is
* stream to read from
* @param md
* message digest to check the response against
+ * @param blobType
+ * whether the BLOB should be permanent or transient
*
* @throws IOException
* if the response is an error, the message digest does not match or reading the response
* failed
*/
- private static BlobKey receiveAndCheckPutResponse(InputStream is, MessageDigest md)
+ private static BlobKey receiveAndCheckPutResponse(
+ InputStream is, MessageDigest md, BlobKey.BlobType blobType)
throws IOException {
int response = is.read();
if (response < 0) {
@@ -497,7 +496,7 @@ public final class BlobClient implements Closeable {
else if (response == RETURN_OKAY) {
BlobKey remoteKey = BlobKey.readFromInputStream(is);
- BlobKey localKey = new BlobKey(md.digest());
+ BlobKey localKey = BlobKey.createKey(blobType, md.digest());
if (!localKey.equals(remoteKey)) {
throw new IOException("Detected data corruption during transfer");
@@ -514,86 +513,6 @@ public final class BlobClient implements Closeable {
}
}
- // --------------------------------------------------------------------------------------------
- // DELETE
- // --------------------------------------------------------------------------------------------
-
- /**
- * Deletes the (transient) BLOB identified by the given BLOB key and job ID from the BLOB
- * server.
- *
- * @param jobId
- * the ID of job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
- * @param key
- * the key to identify the BLOB
- *
- * @return <tt>true</tt> if the delete operation was successful at the {@link BlobServer};
- * <tt>false</tt> otherwise
- *
- * @throws IOException
- * thrown if an I/O error occurs while transferring the request to the BLOB server or if the
- * BLOB server throws an exception while processing the request
- */
- boolean deleteInternal(@Nullable JobID jobId, BlobKey key)
- throws IOException {
-
- checkNotNull(key);
-
- try {
- final OutputStream outputStream = this.socket.getOutputStream();
- final InputStream inputStream = this.socket.getInputStream();
-
- // Signal type of operation
- outputStream.write(DELETE_OPERATION);
-
- // delete blob key
- if (jobId == null) {
- outputStream.write(CONTENT_NO_JOB);
- } else {
- outputStream.write(CONTENT_FOR_JOB);
- outputStream.write(jobId.getBytes());
- }
- key.writeToOutputStream(outputStream);
-
- return receiveAndCheckDeleteResponse(inputStream);
- }
- catch (Throwable t) {
- BlobUtils.closeSilently(socket, LOG);
- throw new IOException("DELETE operation failed: " + t.getMessage(), t);
- }
- }
-
- /**
- * Reads the response from the input stream and throws in case of errors
- *
- * @param is
- * stream to read from
- *
- * @return <tt>true</tt> if the delete operation was successful at the {@link BlobServer};
- * <tt>false</tt> otherwise
- *
- * @throws IOException
- * if the server code throws an exception or if reading the response failed
- */
- private static boolean receiveAndCheckDeleteResponse(InputStream is) throws IOException {
- int response = is.read();
- if (response < 0) {
- throw new EOFException("Premature end of response");
- }
- if (response == RETURN_ERROR) {
- Throwable cause = readExceptionFromStream(is);
- if (cause == null) {
- return false;
- } else {
- throw new IOException("Server side error: " + cause.getMessage(), cause);
- }
- }
- else if (response != RETURN_OKAY) {
- throw new IOException("Unrecognized response");
- }
- return true;
- }
-
/**
* Uploads the JAR files to the {@link PermanentBlobService} of the {@link BlobServer} at the
* given address with HA as configured.
@@ -610,12 +529,16 @@ public final class BlobClient implements Closeable {
* @throws IOException
* if the upload fails
*/
- public static List<BlobKey> uploadJarFiles(InetSocketAddress serverAddress,
- Configuration clientConfig, JobID jobId, List<Path> jars) throws IOException {checkNotNull(jobId);
+ public static List<PermanentBlobKey> uploadJarFiles(
+ InetSocketAddress serverAddress, Configuration clientConfig, JobID jobId, List<Path> jars)
+ throws IOException {
+
+ checkNotNull(jobId);
+
if (jars.isEmpty()) {
return Collections.emptyList();
} else {
- List<BlobKey> blobKeys = new ArrayList<>();
+ List<PermanentBlobKey> blobKeys = new ArrayList<>();
try (BlobClient blobClient = new BlobClient(serverAddress, clientConfig)) {
for (final Path jar : jars) {
@@ -623,7 +546,8 @@ public final class BlobClient implements Closeable {
FSDataInputStream is = null;
try {
is = fs.open(jar);
- final BlobKey key = blobClient.putInputStream(jobId, is, true);
+ final PermanentBlobKey key =
+ (PermanentBlobKey) blobClient.putInputStream(jobId, is, PERMANENT_BLOB);
blobKeys.add(key);
} finally {
if (is != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
index a89a461..7a73917 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
@@ -22,8 +22,12 @@ import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.security.MessageDigest;
+import java.util.Arrays;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
import static org.apache.flink.runtime.blob.BlobUtils.readLength;
/**
@@ -38,6 +42,14 @@ final class BlobInputStream extends InputStream {
private final InputStream wrappedInputStream;
/**
+ * The wrapped output stream from the underlying TCP connection.
+ *
+ * <p>This is used to signal the success or failure of the read operation after receiving the
+ * whole BLOB and verifying the checksum.
+ */
+ private final OutputStream wrappedOutputStream;
+
+ /**
* The BLOB key if the GET operation has been performed on a content-addressable BLOB, otherwise <code>null<code>.
*/
private final BlobKey blobKey;
@@ -65,12 +77,17 @@ final class BlobInputStream extends InputStream {
* the underlying input stream to read from
* @param blobKey
* the expected BLOB key for content-addressable BLOBs, <code>null</code> for non-content-addressable BLOBs.
+ * @param wrappedOutputStream
+ * the underlying output stream to write the result to
+ *
* @throws IOException
* throws if an I/O error occurs while reading the BLOB data from the BLOB server
*/
- BlobInputStream(final InputStream wrappedInputStream, final BlobKey blobKey) throws IOException {
+ BlobInputStream(
+ final InputStream wrappedInputStream, final BlobKey blobKey, OutputStream wrappedOutputStream) throws IOException {
this.wrappedInputStream = wrappedInputStream;
this.blobKey = blobKey;
+ this.wrappedOutputStream = wrappedOutputStream;
this.bytesToReceive = readLength(wrappedInputStream);
if (this.bytesToReceive < 0) {
throw new FileNotFoundException();
@@ -106,10 +123,12 @@ final class BlobInputStream extends InputStream {
if (this.md != null) {
this.md.update((byte) read);
if (this.bytesReceived == this.bytesToReceive) {
- final BlobKey computedKey = new BlobKey(this.md.digest());
- if (!computedKey.equals(this.blobKey)) {
+ final byte[] computedKey = this.md.digest();
+ if (!Arrays.equals(computedKey, this.blobKey.getHash())) {
+ this.wrappedOutputStream.write(RETURN_ERROR);
throw new IOException("Detected data corruption during transfer");
}
+ this.wrappedOutputStream.write(RETURN_OKAY);
}
}
@@ -140,10 +159,12 @@ final class BlobInputStream extends InputStream {
if (this.md != null) {
this.md.update(b, off, read);
if (this.bytesReceived == this.bytesToReceive) {
- final BlobKey computedKey = new BlobKey(this.md.digest());
- if (!computedKey.equals(this.blobKey)) {
+ final byte[] computedKey = this.md.digest();
+ if (!Arrays.equals(computedKey, this.blobKey.getHash())) {
+ this.wrappedOutputStream.write(RETURN_ERROR);
throw new IOException("Detected data corruption during transfer");
}
+ this.wrappedOutputStream.write(RETURN_OKAY);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
index bd254dd..0aa45e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
@@ -18,6 +18,9 @@
package org.apache.flink.runtime.blob;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.StringUtils;
+
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
@@ -26,38 +29,67 @@ import java.io.Serializable;
import java.security.MessageDigest;
import java.util.Arrays;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A BLOB key uniquely identifies a BLOB.
*/
-public final class BlobKey implements Serializable, Comparable<BlobKey> {
+abstract class BlobKey implements Serializable, Comparable<BlobKey> {
private static final long serialVersionUID = 3847117712521785209L;
- /** Array of hex characters to facilitate fast toString() method. */
- private static final char[] HEX_ARRAY = "0123456789abcdef".toCharArray();
-
/** Size of the internal BLOB key in bytes. */
private static final int SIZE = 20;
-
/** The byte buffer storing the actual key data. */
private final byte[] key;
/**
+ * (Internal) BLOB type - to be reflected by the inheriting sub-class.
+ */
+ private final BlobType type;
+
+ /**
+ * BLOB type, i.e. permanent or transient.
+ */
+ enum BlobType {
+ /**
+ * Indicates a permanent BLOB whose lifecycle is that of a job and which is made highly
+ * available.
+ */
+ PERMANENT_BLOB,
+ /**
+ * Indicates a transient BLOB whose lifecycle is managed by the user and which is not made
+ * highly available.
+ */
+ TRANSIENT_BLOB
+ }
+
+ /**
* Constructs a new BLOB key.
+ *
+ * @param type
+ * whether the referenced BLOB is permanent or transient
*/
- public BlobKey() {
+ protected BlobKey(BlobType type) {
+ this.type = checkNotNull(type);
this.key = new byte[SIZE];
}
/**
* Constructs a new BLOB key from the given byte array.
- *
+ *
+ * @param type
+ * whether the referenced BLOB is permanent or transient
* @param key
* the actual key data
*/
- BlobKey(byte[] key) {
- if (key.length != SIZE) {
+ protected BlobKey(BlobType type, byte[] key) {
+ this.type = checkNotNull(type);
+
+ if (key == null || key.length != SIZE) {
throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
}
@@ -65,6 +97,50 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
}
/**
+ * Returns the right {@link BlobKey} subclass for the given parameters.
+ *
+ * @param type
+ * whether the referenced BLOB is permanent or transient
+ *
+ * @return BlobKey subclass
+ */
+ @VisibleForTesting
+ static BlobKey createKey(BlobType type) {
+ if (type == PERMANENT_BLOB) {
+ return new PermanentBlobKey();
+ } else {
+ return new TransientBlobKey();
+ }
+ }
+
+ /**
+ * Returns the right {@link BlobKey} subclass for the given parameters.
+ *
+ * @param type
+ * whether the referenced BLOB is permanent or transient
+ * @param key
+ * the actual key data
+ *
+ * @return BlobKey subclass
+ */
+ static BlobKey createKey(BlobType type, byte[] key) {
+ if (type == PERMANENT_BLOB) {
+ return new PermanentBlobKey(key);
+ } else {
+ return new TransientBlobKey(key);
+ }
+ }
+
+ /**
+ * Returns the hash component of this key.
+ *
+ * @return a 20 bit hash of the contents the key refers to
+ */
+ byte[] getHash() {
+ return key;
+ }
+
+ /**
* Adds the BLOB key to the given {@link MessageDigest}.
*
* @param md
@@ -83,30 +159,36 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
final BlobKey bk = (BlobKey) obj;
- return Arrays.equals(this.key, bk.key);
+ return Arrays.equals(this.key, bk.key) && this.type == bk.type;
}
@Override
public int hashCode() {
- return Arrays.hashCode(this.key);
+ int result = Arrays.hashCode(this.key);
+ result = 37 * result + this.type.hashCode();
+ return result;
}
@Override
public String toString() {
- // from http://stackoverflow.com/questions/9655181/convert-from-byte-array-to-hex-string-in-java
- final char[] hexChars = new char[SIZE * 2];
- for (int i = 0; i < SIZE; ++i) {
- int v = this.key[i] & 0xff;
- hexChars[i * 2] = HEX_ARRAY[v >>> 4];
- hexChars[i * 2 + 1] = HEX_ARRAY[v & 0x0f];
+ final String typeString;
+ switch (this.type) {
+ case TRANSIENT_BLOB:
+ typeString = "t-";
+ break;
+ case PERMANENT_BLOB:
+ typeString = "p-";
+ break;
+ default:
+ // this actually never happens!
+ throw new IllegalStateException("Invalid BLOB type");
}
-
- return new String(hexChars);
+ return typeString + StringUtils.byteToHexString(this.key);
}
@Override
public int compareTo(BlobKey o) {
-
+ // compare the hashes first
final byte[] aarr = this.key;
final byte[] barr = o.key;
final int len = Math.min(aarr.length, barr.length);
@@ -118,8 +200,13 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
return a - b;
}
}
-
- return aarr.length - barr.length;
+
+ if (aarr.length == barr.length) {
+ // same hash contents - compare the BLOB types
+ return this.type.compareTo(o.type);
+ } else {
+ return aarr.length - barr.length;
+ }
}
// --------------------------------------------------------------------------------------------
@@ -138,15 +225,30 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
final byte[] key = new byte[BlobKey.SIZE];
int bytesRead = 0;
- while (bytesRead < BlobKey.SIZE) {
- final int read = inputStream.read(key, bytesRead, BlobKey.SIZE - bytesRead);
+ // read key
+ while (bytesRead < key.length) {
+ final int read = inputStream.read(key, bytesRead, key.length - bytesRead);
if (read < 0) {
throw new EOFException("Read an incomplete BLOB key");
}
bytesRead += read;
}
+ // read BLOB type
+ final BlobType blobType;
+ {
+ final int read = inputStream.read();
+ if (read < 0) {
+ throw new EOFException("Read an incomplete BLOB type");
+ } else if (read == TRANSIENT_BLOB.ordinal()) {
+ blobType = TRANSIENT_BLOB;
+ } else if (read == PERMANENT_BLOB.ordinal()) {
+ blobType = PERMANENT_BLOB;
+ } else {
+ throw new IOException("Invalid data received for the BLOB type: " + read);
+ }
+ }
- return new BlobKey(key);
+ return createKey(blobType, key);
}
/**
@@ -159,5 +261,6 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
*/
void writeToOutputStream(final OutputStream outputStream) throws IOException {
outputStream.write(this.key);
+ outputStream.write(this.type.ordinal());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 836d436..7804dfd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -26,11 +26,13 @@ import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.NetUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
+
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
@@ -50,6 +52,8 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -352,8 +356,8 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
* Thrown if the file retrieval failed.
*/
@Override
- public File getFile(BlobKey key) throws IOException {
- return getFileInternal(null, key, false);
+ public File getFile(TransientBlobKey key) throws IOException {
+ return getFileInternal(null, key);
}
/**
@@ -374,9 +378,9 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
* Thrown if the file retrieval failed.
*/
@Override
- public File getFile(JobID jobId, BlobKey key) throws IOException {
+ public File getFile(JobID jobId, TransientBlobKey key) throws IOException {
checkNotNull(jobId);
- return getFileInternal(jobId, key, false);
+ return getFileInternal(jobId, key);
}
/**
@@ -399,9 +403,9 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
* if any other error occurs when retrieving the file
*/
@Override
- public File getHAFile(JobID jobId, BlobKey key) throws IOException {
+ public File getFile(JobID jobId, PermanentBlobKey key) throws IOException {
checkNotNull(jobId);
- return getFileInternal(jobId, key, true);
+ return getFileInternal(jobId, key);
}
/**
@@ -415,22 +419,20 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
* ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
* @param blobKey
* blob key associated with the requested file
- * @param highlyAvailable
- * whether to the requested file is highly available (HA)
*
* @return file referring to the local storage location of the BLOB
*
* @throws IOException
* Thrown if the file retrieval failed.
*/
- private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable) throws IOException {
+ private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOException {
checkArgument(blobKey != null, "BLOB key cannot be null.");
final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);
readWriteLock.readLock().lock();
try {
- getFileInternal(jobId, blobKey, highlyAvailable, localFile);
+ getFileInternal(jobId, blobKey, localFile);
return localFile;
} finally {
readWriteLock.readLock().unlock();
@@ -450,20 +452,18 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
* ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
* @param blobKey
* blob key associated with the requested file
- * @param highlyAvailable
- * whether to the requested file is highly available (HA)
* @param localFile
* (local) file where the blob is/should be stored
*
* @throws IOException
* Thrown if the file retrieval failed.
*/
- void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable, File localFile) throws IOException {
+ void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, File localFile) throws IOException {
// assume readWriteLock.readLock() was already locked (cannot really check that)
if (localFile.exists()) {
return;
- } else if (highlyAvailable) {
+ } else if (blobKey instanceof PermanentBlobKey) {
// Try the HA blob store
// first we have to release the read lock in order to acquire the write lock
readWriteLock.readLock().unlock();
@@ -495,30 +495,30 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
}
@Override
- public BlobKey put(byte[] value) throws IOException {
- return putBuffer(null, value, false);
+ public TransientBlobKey putTransient(byte[] value) throws IOException {
+ return (TransientBlobKey) putBuffer(null, value, TRANSIENT_BLOB);
}
@Override
- public BlobKey put(JobID jobId, byte[] value) throws IOException {
+ public TransientBlobKey putTransient(JobID jobId, byte[] value) throws IOException {
checkNotNull(jobId);
- return putBuffer(jobId, value, false);
+ return (TransientBlobKey) putBuffer(jobId, value, TRANSIENT_BLOB);
}
@Override
- public BlobKey put(InputStream inputStream) throws IOException {
- return putInputStream(null, inputStream, false);
+ public TransientBlobKey putTransient(InputStream inputStream) throws IOException {
+ return (TransientBlobKey) putInputStream(null, inputStream, TRANSIENT_BLOB);
}
@Override
- public BlobKey put(JobID jobId, InputStream inputStream) throws IOException {
+ public TransientBlobKey putTransient(JobID jobId, InputStream inputStream) throws IOException {
checkNotNull(jobId);
- return putInputStream(jobId, inputStream, false);
+ return (TransientBlobKey) putInputStream(jobId, inputStream, TRANSIENT_BLOB);
}
/**
* Uploads the data of the given byte array for the given job to the BLOB server and makes it
- * highly available (HA).
+ * a permanent BLOB.
*
* @param jobId
* the ID of the job the BLOB belongs to
@@ -531,14 +531,14 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
* thrown if an I/O error occurs while writing it to a local file, or uploading it to the HA
* store
*/
- public BlobKey putHA(JobID jobId, byte[] value) throws IOException {
+ public PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException {
checkNotNull(jobId);
- return putBuffer(jobId, value, true);
+ return (PermanentBlobKey) putBuffer(jobId, value, PERMANENT_BLOB);
}
/**
* Uploads the data from the given input stream for the given job to the BLOB server and makes it
- * highly available (HA).
+ * a permanent BLOB.
*
* @param jobId
* ID of the job this blob belongs to
@@ -551,9 +551,9 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
* thrown if an I/O error occurs while reading the data from the input stream, writing it to a
* local file, or uploading it to the HA store
*/
- public BlobKey putHA(JobID jobId, InputStream inputStream) throws IOException {
+ public PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException {
checkNotNull(jobId);
- return putInputStream(jobId, inputStream, true);
+ return (PermanentBlobKey) putInputStream(jobId, inputStream, PERMANENT_BLOB);
}
/**
@@ -563,8 +563,8 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
* the ID of the job the BLOB belongs to
* @param value
* the buffer to upload
- * @param highlyAvailable
- * whether to make the data highly available (HA)
+ * @param blobType
+ * whether to make the data permanent or transient
*
* @return the computed BLOB key identifying the BLOB on the server
*
@@ -572,7 +572,7 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
* thrown if an I/O error occurs while writing it to a local file, or uploading it to the HA
* store
*/
- private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, boolean highlyAvailable)
+ private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, BlobKey.BlobType blobType)
throws IOException {
if (LOG.isDebugEnabled()) {
@@ -586,10 +586,10 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
md.update(value);
fos.write(value);
- blobKey = new BlobKey(md.digest());
+ blobKey = BlobKey.createKey(blobType, md.digest());
// persist file
- moveTempFileToStore(incomingFile, jobId, blobKey, highlyAvailable);
+ moveTempFileToStore(incomingFile, jobId, blobKey);
return blobKey;
} finally {
@@ -609,8 +609,8 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
* the ID of the job the BLOB belongs to
* @param inputStream
* the input stream to read the data from
- * @param highlyAvailable
- * whether to make the data highly available (HA)
+ * @param blobType
+ * whether to make the data permanent or transient
*
* @return the computed BLOB key identifying the BLOB on the server
*
@@ -619,7 +619,7 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
* local file, or uploading it to the HA store
*/
private BlobKey putInputStream(
- @Nullable JobID jobId, InputStream inputStream, boolean highlyAvailable)
+ @Nullable JobID jobId, InputStream inputStream, BlobKey.BlobType blobType)
throws IOException {
if (LOG.isDebugEnabled()) {
@@ -642,10 +642,10 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
md.update(buf, 0, bytesRead);
}
- blobKey = new BlobKey(md.digest());
+ blobKey = BlobKey.createKey(blobType, md.digest());
// persist file
- moveTempFileToStore(incomingFile, jobId, blobKey, highlyAvailable);
+ moveTempFileToStore(incomingFile, jobId, blobKey);
return blobKey;
} finally {
@@ -667,21 +667,18 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
* ID of the job this blob belongs to or <tt>null</tt> if job-unrelated
* @param blobKey
* BLOB key identifying the file
- * @param highlyAvailable
- * whether this file should be stored in the HA store
*
* @throws IOException
* thrown if an I/O error occurs while moving the file or uploading it to the HA store
*/
void moveTempFileToStore(
- File incomingFile, @Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable)
- throws IOException {
+ File incomingFile, @Nullable JobID jobId, BlobKey blobKey) throws IOException {
File storageFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);
BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, storageFile, readWriteLock.writeLock(), LOG,
- highlyAvailable ? blobStore : null);
+ blobKey instanceof PermanentBlobKey ? blobStore : null);
}
/**
@@ -695,7 +692,7 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
* <tt>false</tt> otherwise
*/
@Override
- public boolean delete(BlobKey key) {
+ public boolean deleteFromCache(TransientBlobKey key) {
return deleteInternal(null, key);
}
@@ -711,7 +708,7 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
* <tt>false</tt> otherwise
*/
@Override
- public boolean delete(JobID jobId, BlobKey key) {
+ public boolean deleteFromCache(JobID jobId, TransientBlobKey key) {
checkNotNull(jobId);
return deleteInternal(jobId, key);
}
@@ -727,7 +724,7 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
* @return <tt>true</tt> if the given blob is successfully deleted or non-existing;
* <tt>false</tt> otherwise
*/
- boolean deleteInternal(@Nullable JobID jobId, BlobKey key) {
+ boolean deleteInternal(@Nullable JobID jobId, TransientBlobKey key) {
final File localFile =
new File(BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));
@@ -781,14 +778,13 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
}
}
-
@Override
- public PermanentBlobService getPermanentBlobStore() {
+ public PermanentBlobService getPermanentBlobService() {
return this;
}
@Override
- public TransientBlobService getTransientBlobStore() {
+ public TransientBlobService getTransientBlobService() {
return this;
}
@@ -812,7 +808,7 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
}
/**
- * Access to the server socket, for testing
+ * Access to the server socket, for testing.
*/
ServerSocket getServerSocket() {
return this.serverSocket;
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index 28d006a..be62581 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.blob;
import org.apache.flink.api.common.JobID;
import org.apache.flink.util.InstantiationUtil;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,18 +38,19 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_FOR_JOB;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_NO_JOB;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_FOR_JOB_HA;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_RELATED_CONTENT;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_UNRELATED_CONTENT;
import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
import static org.apache.flink.runtime.blob.BlobUtils.closeSilently;
import static org.apache.flink.runtime.blob.BlobUtils.readFully;
import static org.apache.flink.runtime.blob.BlobUtils.readLength;
import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -65,12 +67,12 @@ class BlobServerConnection extends Thread {
/** The BLOB server. */
private final BlobServer blobServer;
- /** Read lock to synchronize file accesses */
+ /** Read lock to synchronize file accesses. */
private final Lock readLock;
/**
- * Creates a new BLOB connection for a client request
- *
+ * Creates a new BLOB connection for a client request.
+ *
* @param clientSocket The socket to read/write data.
* @param blobServer The BLOB server.
*/
@@ -114,9 +116,6 @@ class BlobServerConnection extends Thread {
case GET_OPERATION:
get(inputStream, outputStream, new byte[BUFFER_SIZE]);
break;
- case DELETE_OPERATION:
- delete(inputStream, outputStream);
- break;
default:
throw new IOException("Unknown operation " + operation);
}
@@ -150,6 +149,10 @@ class BlobServerConnection extends Thread {
/**
* Handles an incoming GET request from a BLOB client.
*
+ * <p>Transient BLOB files are deleted after a successful read operation by the client. Note
+ * that we do not enforce atomicity here, i.e. multiple clients reading from the same BLOB may
+ * still succeed.
+ *
* @param inputStream
* the input stream to read incoming data from
* @param outputStream
@@ -173,7 +176,6 @@ class BlobServerConnection extends Thread {
final File blobFile;
final JobID jobId;
final BlobKey blobKey;
- final boolean permanentBlob;
try {
// read HEADER contents: job ID, key, HA mode/permanent or transient BLOB
@@ -182,25 +184,21 @@ class BlobServerConnection extends Thread {
throw new EOFException("Premature end of GET request");
}
- // Receive the
- if (mode == CONTENT_NO_JOB) {
+ // Receive the jobId and key
+ if (mode == JOB_UNRELATED_CONTENT) {
jobId = null;
- permanentBlob = false;
- } else if (mode == CONTENT_FOR_JOB_HA) {
+ } else if (mode == JOB_RELATED_CONTENT) {
byte[] jidBytes = new byte[JobID.SIZE];
readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
jobId = JobID.fromByteArray(jidBytes);
- permanentBlob = true;
- } else if (mode == CONTENT_FOR_JOB) {
- byte[] jidBytes = new byte[JobID.SIZE];
- readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
- jobId = JobID.fromByteArray(jidBytes);
- permanentBlob = false;
} else {
throw new IOException("Unknown type of BLOB addressing: " + mode + '.');
}
blobKey = BlobKey.readFromInputStream(inputStream);
+ checkArgument(blobKey instanceof TransientBlobKey || jobId != null,
+ "Invalid BLOB addressing for permanent BLOBs");
+
if (LOG.isDebugEnabled()) {
LOG.debug("Received GET request for BLOB {}/{} from {}.", jobId,
blobKey, clientSocket.getInetAddress());
@@ -212,7 +210,7 @@ class BlobServerConnection extends Thread {
// up to here, an error can give a good message
}
catch (Throwable t) {
- LOG.error("GET operation failed", t);
+ LOG.error("GET operation from {} failed.", clientSocket.getInetAddress(), t);
try {
writeErrorToStream(outputStream, t);
}
@@ -224,56 +222,73 @@ class BlobServerConnection extends Thread {
return;
}
- readLock.lock();
-
try {
- // copy the file to local store if it does not exist yet
+
+ readLock.lock();
try {
- blobServer.getFileInternal(jobId, blobKey, permanentBlob, blobFile);
+ // copy the file to local store if it does not exist yet
+ try {
+ blobServer.getFileInternal(jobId, blobKey, blobFile);
- // enforce a 2GB max for now (otherwise the protocol's length field needs to be increased)
- if (blobFile.length() > Integer.MAX_VALUE) {
- throw new IOException("BLOB size exceeds the maximum size (2 GB).");
- }
+ // enforce a 2GB max for now (otherwise the protocol's length field needs to be increased)
+ if (blobFile.length() > Integer.MAX_VALUE) {
+ throw new IOException("BLOB size exceeds the maximum size (2 GB).");
+ }
- outputStream.write(RETURN_OKAY);
- } catch (Throwable t) {
- LOG.error("GET operation failed", t);
- try {
- writeErrorToStream(outputStream, t);
+ outputStream.write(RETURN_OKAY);
+ } catch (Throwable t) {
+ LOG.error("GET operation failed for BLOB {}/{} from {}.", jobId,
+ blobKey, clientSocket.getInetAddress(), t);
+ try {
+ writeErrorToStream(outputStream, t);
+ } catch (IOException e) {
+ // since we are in an exception case, it means that we could not send the error
+ // ignore this
+ }
+ clientSocket.close();
+ return;
}
- catch (IOException e) {
- // since we are in an exception case, it means that we could not send the error
- // ignore this
+
+ // from here on, we started sending data, so all we can do is close the connection when something happens
+ int blobLen = (int) blobFile.length();
+ writeLength(blobLen, outputStream);
+
+ try (FileInputStream fis = new FileInputStream(blobFile)) {
+ int bytesRemaining = blobLen;
+ while (bytesRemaining > 0) {
+ int read = fis.read(buf);
+ if (read < 0) {
+ throw new IOException("Premature end of BLOB file stream for " +
+ blobFile.getAbsolutePath());
+ }
+ outputStream.write(buf, 0, read);
+ bytesRemaining -= read;
+ }
}
- clientSocket.close();
- return;
+ } finally {
+ readLock.unlock();
}
- // from here on, we started sending data, so all we can do is close the connection when something happens
- int blobLen = (int) blobFile.length();
- writeLength(blobLen, outputStream);
-
- try (FileInputStream fis = new FileInputStream(blobFile)) {
- int bytesRemaining = blobLen;
- while (bytesRemaining > 0) {
- int read = fis.read(buf);
- if (read < 0) {
- throw new IOException("Premature end of BLOB file stream for " + blobFile.getAbsolutePath());
- }
- outputStream.write(buf, 0, read);
- bytesRemaining -= read;
+ // on successful transfer, delete transient files
+ int result = inputStream.read();
+ if (result < 0) {
+ throw new EOFException("Premature end of GET request");
+ } else if (blobKey instanceof TransientBlobKey && result == RETURN_OKAY) {
+ // ignore the result from the operation
+ if (!blobServer.deleteInternal(jobId, (TransientBlobKey) blobKey)) {
+ LOG.warn("DELETE operation failed for BLOB {}/{} from {}.", jobId,
+ blobKey, clientSocket.getInetAddress());
}
}
+
} catch (SocketException e) {
// happens when the other side disconnects
LOG.debug("Socket connection closed", e);
} catch (Throwable t) {
LOG.error("GET operation failed", t);
clientSocket.close();
- } finally {
- readLock.unlock();
}
+
}
/**
@@ -300,33 +315,40 @@ class BlobServerConnection extends Thread {
}
final JobID jobId;
- final boolean permanentBlob;
- if (mode == CONTENT_NO_JOB) {
+ if (mode == JOB_UNRELATED_CONTENT) {
jobId = null;
- permanentBlob = false;
- } else if (mode == CONTENT_FOR_JOB_HA) {
+ } else if (mode == JOB_RELATED_CONTENT) {
byte[] jidBytes = new byte[JobID.SIZE];
readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
jobId = JobID.fromByteArray(jidBytes);
- permanentBlob = true;
- } else if (mode == CONTENT_FOR_JOB) {
- byte[] jidBytes = new byte[JobID.SIZE];
- readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
- jobId = JobID.fromByteArray(jidBytes);
- permanentBlob = false;
} else {
throw new IOException("Unknown type of BLOB addressing.");
}
+ final BlobKey.BlobType blobType;
+ {
+ final int read = inputStream.read();
+ if (read < 0) {
+ throw new EOFException("Read an incomplete BLOB type");
+ } else if (read == TRANSIENT_BLOB.ordinal()) {
+ blobType = TRANSIENT_BLOB;
+ } else if (read == PERMANENT_BLOB.ordinal()) {
+ blobType = PERMANENT_BLOB;
+ checkArgument(jobId != null, "Invalid BLOB addressing for permanent BLOBs");
+ } else {
+ throw new IOException("Invalid data received for the BLOB type: " + read);
+ }
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("Received PUT request for BLOB of job {} with from {}.", jobId,
clientSocket.getInetAddress());
}
incomingFile = blobServer.createTemporaryFilename();
- BlobKey blobKey = readFileFully(inputStream, incomingFile, buf);
+ BlobKey blobKey = readFileFully(inputStream, incomingFile, buf, blobType);
- blobServer.moveTempFileToStore(incomingFile, jobId, blobKey, permanentBlob);
+ blobServer.moveTempFileToStore(incomingFile, jobId, blobKey);
// Return computed key to client for validation
outputStream.write(RETURN_OKAY);
@@ -365,6 +387,8 @@ class BlobServerConnection extends Thread {
* file to write to
* @param buf
* An auxiliary buffer for data serialization/deserialization
+ * @param blobType
+ * whether to make the data permanent or transient
*
* @return the received file's content hash as a BLOB key
*
@@ -372,7 +396,7 @@ class BlobServerConnection extends Thread {
* thrown if an I/O error occurs while reading/writing data from/to the respective streams
*/
private static BlobKey readFileFully(
- final InputStream inputStream, final File incomingFile, final byte[] buf)
+ final InputStream inputStream, final File incomingFile, final byte[] buf, BlobKey.BlobType blobType)
throws IOException {
MessageDigest md = BlobUtils.createMessageDigest();
@@ -393,59 +417,7 @@ class BlobServerConnection extends Thread {
md.update(buf, 0, bytesExpected);
}
- return new BlobKey(md.digest());
- }
- }
-
- /**
- * Handles an incoming DELETE request from a BLOB client.
- *
- * @param inputStream
- * The input stream to read the request from.
- * @param outputStream
- * The output stream to write the response to.
- *
- * @throws IOException
- * Thrown if an I/O error occurs while reading the request data from the input stream.
- */
- private void delete(InputStream inputStream, OutputStream outputStream) throws IOException {
-
- try {
- // read HEADER contents: job ID, key, HA mode/permanent or transient BLOB
- final int mode = inputStream.read();
- if (mode < 0) {
- throw new EOFException("Premature end of DELETE request");
- }
-
- final JobID jobId;
- if (mode == CONTENT_NO_JOB) {
- jobId = null;
- } else if (mode == CONTENT_FOR_JOB) {
- byte[] jidBytes = new byte[JobID.SIZE];
- readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
- jobId = JobID.fromByteArray(jidBytes);
- } else {
- throw new IOException("Unknown type of BLOB addressing.");
- }
- BlobKey key = BlobKey.readFromInputStream(inputStream);
-
- if (!blobServer.deleteInternal(jobId, key)) {
- LOG.error("DELETE operation failed");
- writeErrorToStream(outputStream, null);
- } else {
- outputStream.write(RETURN_OKAY);
- }
- }
- catch (Throwable t) {
- LOG.error("DELETE operation failed", t);
- try {
- writeErrorToStream(outputStream, t);
- }
- catch (IOException e) {
- // since we are in an exception case, it means not much that we could not send the error
- // ignore this
- }
- clientSocket.close();
+ return BlobKey.createKey(blobType, md.digest());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
index 35fca14..5c9c7b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
@@ -26,15 +26,20 @@ public class BlobServerProtocol {
/** The buffer size in bytes for network transfers. */
static final int BUFFER_SIZE = 65536; // 64 K
- /** Internal code to identify a PUT operation. */
+ /**
+ * Internal code to identify a PUT operation.
+ *
+ * <p>Note: previously, there was also <tt>DELETE_OPERATION</tt> (code <tt>2</tt>).
+ */
static final byte PUT_OPERATION = 0;
- /** Internal code to identify a GET operation. */
+ /**
+ * Internal code to identify a GET operation.
+ *
+ * <p>Note: previously, there was also <tt>DELETE_OPERATION</tt> (code <tt>2</tt>).
+ */
static final byte GET_OPERATION = 1;
- /** Internal code to identify a DELETE operation. */
- static final byte DELETE_OPERATION = 2;
-
/** Internal code to identify a successful operation. */
static final byte RETURN_OKAY = 0;
@@ -42,22 +47,18 @@ public class BlobServerProtocol {
static final byte RETURN_ERROR = 1;
/**
- * Internal code to identify a job-unrelated transient BLOB.
- * <p>
- * Note: previously, there was also <tt>NAME_ADDRESSABLE</tt> (code <tt>1</tt>) and
- * <tt>JOB_ID_SCOPE</tt> (code <tt>2</tt>).
- */
- static final byte CONTENT_NO_JOB = 0;
-
- /**
- * Internal code to identify a job-related transient BLOB.
+ * Internal code to identify a job-unrelated BLOBs (only for transient BLOBs!).
+ *
+ * <p>Note: previously, there was also <tt>NAME_ADDRESSABLE</tt> (code <tt>1</tt>).
*/
- static final byte CONTENT_FOR_JOB = 3;
+ static final byte JOB_UNRELATED_CONTENT = 0;
/**
- * Internal code to identify a job-related permanent BLOB.
+ * Internal code to identify a job-related (permanent or transient) BLOBs.
+ *
+ * <p>Note: This is equal to the previous <tt>JOB_ID_SCOPE</tt> (code <tt>2</tt>).
*/
- static final byte CONTENT_FOR_JOB_HA = 4;
+ static final byte JOB_RELATED_CONTENT = 2;
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
index b643343..174499a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
@@ -30,14 +30,14 @@ public interface BlobService extends Closeable {
*
* @return BLOB service
*/
- PermanentBlobService getPermanentBlobStore();
+ PermanentBlobService getPermanentBlobService();
/**
* Returns a BLOB service for accessing transient BLOBs.
*
* @return BLOB service
*/
- TransientBlobService getTransientBlobStore();
+ TransientBlobService getTransientBlobService();
/**
* Returns the port of the BLOB server that this BLOB service is working with.
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index ebae4f4..d8223c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -462,7 +462,7 @@ public class BlobUtils {
"BlobServer use the same storage directory.");
// we cannot be sure at this point whether the file has already been uploaded to the blob
// store or not. Even if the blobStore might shortly be in an inconsistent state, we have
- // persist the blob. Otherwise we might not be able to recover the job.
+ // to persist the blob. Otherwise we might not be able to recover the job.
}
if (blobStore != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index 1f9af03..4fed4cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -33,6 +33,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.MessageDigest;
+import java.util.Arrays;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -113,8 +114,8 @@ public class FileSystemBlobStore implements BlobStoreService {
}
// verify that file contents are correct
- final BlobKey computedKey = new BlobKey(md.digest());
- if (!computedKey.equals(blobKey)) {
+ final byte[] computedKey = md.digest();
+ if (!Arrays.equals(computedKey, blobKey.getHash())) {
throw new IOException("Detected data corruption during transfer");
}