You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2018/08/01 19:18:24 UTC
hadoop git commit: HDFS-13421. [PROVIDED Phase 2] Implement
DNA_BACKUP command in Datanode. Contributed by Ewan Higgs.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-12090 d52a2afbf -> bcd1aaab3
HDFS-13421. [PROVIDED Phase 2] Implement DNA_BACKUP command in Datanode. Contributed by Ewan Higgs.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bcd1aaab
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bcd1aaab
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bcd1aaab
Branch: refs/heads/HDFS-12090
Commit: bcd1aaab3d84deee9ba020a143a1e3dcf95cfce4
Parents: d52a2af
Author: Virajith Jalaparti <vi...@apache.org>
Authored: Wed Aug 1 12:13:31 2018 -0700
Committer: Virajith Jalaparti <vi...@apache.org>
Committed: Wed Aug 1 12:13:31 2018 -0700
----------------------------------------------------------------------
.../apache/hadoop/hdfs/BlockInputStream.java | 52 ++++++++
.../hdfs/server/datanode/BPOfferService.java | 6 +
.../hadoop/hdfs/server/datanode/DataNode.java | 20 ++-
.../SyncServiceSatisfierDatanodeWorker.java | 97 +++++++++++++++
.../SyncTaskExecutionFeedbackCollector.java | 54 ++++++++
.../executor/BlockSyncOperationExecutor.java | 122 +++++++++++++++++++
.../executor/BlockSyncReaderFactory.java | 92 ++++++++++++++
.../executor/BlockSyncTaskRunner.java | 69 +++++++++++
.../hadoop/fs/TestHDFSMultipartUploader.java | 3 +-
.../hadoop/hdfs/TestBlockInputStream.java | 84 +++++++++++++
.../TestBlockSyncOperationExecutor.java | 94 ++++++++++++++
11 files changed, 690 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcd1aaab/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockInputStream.java
new file mode 100644
index 0000000..152f83e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockInputStream.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Facade around BlockReader that indeed implements the InputStream interface.
+ */
+public class BlockInputStream extends InputStream {
+ private final BlockReader blockReader;
+
+ public BlockInputStream(BlockReader blockReader) {
+ this.blockReader = blockReader;
+ }
+
+ @Override
+ public int read() throws IOException {
+ byte[] b = new byte[1];
+ int c = blockReader.read(b, 0, b.length);
+ if (c > 0) {
+ return b[0];
+ } else {
+ return -1;
+ }
+ }
+
+ @Override
+ public int read(byte b[], int off, int len) throws IOException {
+ return blockReader.read(b, off, len);
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ return blockReader.skip(n);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcd1aaab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index a25f6a9..b8eef5e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -795,6 +795,12 @@ class BPOfferService {
((BlockECReconstructionCommand) cmd).getECTasks();
dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
break;
+ case DatanodeProtocol.DNA_BACKUP:
+ LOG.info("DatanodeCommand action: DNA_BACKUP");
+ Collection<BlockSyncTask> backupTasks =
+ ((SyncCommand) cmd).getSyncTasks();
+ dn.getSyncServiceSatisfierDatanodeWorker().processSyncTasks(backupTasks);
+ break;
default:
LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcd1aaab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 1e9c57a..8c675d9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -386,6 +386,7 @@ public class DataNode extends ReconfigurableBase
private String dnUserName = null;
private BlockRecoveryWorker blockRecoveryWorker;
private ErasureCodingWorker ecWorker;
+ private SyncServiceSatisfierDatanodeWorker syncServiceSatisfierDatanodeWorker;
private final Tracer tracer;
private final TracerConfigurationManager tracerConfigurationManager;
private static final int NUM_CORES = Runtime.getRuntime()
@@ -1425,6 +1426,9 @@ public class DataNode extends ReconfigurableBase
ecWorker = new ErasureCodingWorker(getConf(), this);
blockRecoveryWorker = new BlockRecoveryWorker(this);
+ syncServiceSatisfierDatanodeWorker =
+ new SyncServiceSatisfierDatanodeWorker(getConf(), this);
+ syncServiceSatisfierDatanodeWorker.start();
blockPoolManager = new BlockPoolManager(this);
blockPoolManager.refreshNamenodes(getConf());
@@ -1976,7 +1980,12 @@ public class DataNode extends ReconfigurableBase
}
}
}
-
+
+ // stop syncServiceSatisfierDatanodeWorker
+ if (syncServiceSatisfierDatanodeWorker != null) {
+ syncServiceSatisfierDatanodeWorker.stop();
+ }
+
List<BPOfferService> bposArray = (this.blockPoolManager == null)
? new ArrayList<BPOfferService>()
: this.blockPoolManager.getAllNamenodeThreads();
@@ -2129,6 +2138,11 @@ public class DataNode extends ReconfigurableBase
notifyAll();
}
tracer.close();
+
+ // Waiting to finish backup SPS worker thread.
+ if (syncServiceSatisfierDatanodeWorker != null) {
+ syncServiceSatisfierDatanodeWorker.waitToFinishWorkerThread();
+ }
}
/**
@@ -3616,4 +3630,8 @@ public class DataNode extends ReconfigurableBase
}
return this.diskBalancer;
}
+
+ public SyncServiceSatisfierDatanodeWorker getSyncServiceSatisfierDatanodeWorker() {
+ return syncServiceSatisfierDatanodeWorker;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcd1aaab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SyncServiceSatisfierDatanodeWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SyncServiceSatisfierDatanodeWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SyncServiceSatisfierDatanodeWorker.java
new file mode 100644
index 0000000..7216e8f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SyncServiceSatisfierDatanodeWorker.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.datanode.syncservice.SyncTaskExecutionFeedbackCollector;
+import org.apache.hadoop.hdfs.server.datanode.syncservice.executor.BlockSyncOperationExecutor;
+import org.apache.hadoop.hdfs.server.datanode.syncservice.executor.BlockSyncReaderFactory;
+import org.apache.hadoop.hdfs.server.datanode.syncservice.executor.BlockSyncTaskRunner;
+import org.apache.hadoop.hdfs.server.protocol.BlockSyncTask;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public class SyncServiceSatisfierDatanodeWorker {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SyncServiceSatisfierDatanodeWorker.class);
+
+ private ExecutorService executorService;
+ private BlockSyncOperationExecutor syncOperationExecutor;
+ private SyncTaskExecutionFeedbackCollector syncTaskExecutionFeedbackCollector;
+
+ public SyncServiceSatisfierDatanodeWorker(Configuration conf, DataNode dataNode) throws IOException {
+ this.executorService = HadoopExecutors.newFixedThreadPool(4);
+ this.syncOperationExecutor =
+ BlockSyncOperationExecutor.createOnDataNode(conf,
+ (locatedBlock, config) -> {
+ try {
+ return BlockSyncReaderFactory.createBlockReader(dataNode, locatedBlock, config);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ );
+ this.syncTaskExecutionFeedbackCollector = new SyncTaskExecutionFeedbackCollector();
+ }
+
+
+ public void start() {
+ this.executorService = HadoopExecutors.newFixedThreadPool(4);
+ }
+
+ public void stop() {
+ this.executorService.shutdown();
+ }
+
+ public void waitToFinishWorkerThread() {
+ try {
+ this.executorService.awaitTermination(3, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ LOG.warn("SyncServiceSatisfierDatanodeWorker interrupted during waiting for finalization.");
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void processSyncTasks(Collection<BlockSyncTask> blockSyncTasks) {
+
+ LOG.debug("Received SyncTasks: {}", blockSyncTasks);
+ for (BlockSyncTask blockSyncTask : blockSyncTasks) {
+ try {
+ executorService.submit(new BlockSyncTaskRunner(blockSyncTask,
+ syncOperationExecutor,
+ syncTaskExecutionFeedback -> syncTaskExecutionFeedbackCollector
+ .addFeedback(syncTaskExecutionFeedback)));
+ } catch (RejectedExecutionException e) {
+ LOG.warn("BlockSyncTask {} for {} was rejected: {}",
+ blockSyncTask.getSyncTaskId(), blockSyncTask.getRemoteURI(),
+ e.getCause());
+ }
+ }
+ }
+
+ public SyncTaskExecutionFeedbackCollector getSyncTaskExecutionFeedbackCollector() {
+ return syncTaskExecutionFeedbackCollector;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcd1aaab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/SyncTaskExecutionFeedbackCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/SyncTaskExecutionFeedbackCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/SyncTaskExecutionFeedbackCollector.java
new file mode 100644
index 0000000..41cd441
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/SyncTaskExecutionFeedbackCollector.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.syncservice;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hdfs.server.protocol.BlockSyncTaskExecutionFeedback;
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
+
+import java.util.List;
+
+/**
+ * DatanodeSyncTaskExecutionFeedbackCollector collects feedback for the
+ * sync service tracker to determine what has happened and report statistics.
+ */
+public class SyncTaskExecutionFeedbackCollector {
+
+ private List<BlockSyncTaskExecutionFeedback> collectedFeedback;
+
+ public SyncTaskExecutionFeedbackCollector() {
+ this.collectedFeedback = Lists.newArrayList();
+ }
+
+ public void addFeedback(BlockSyncTaskExecutionFeedback feedback) {
+ synchronized (this) {
+ collectedFeedback.add(feedback);
+ }
+ }
+
+ public BulkSyncTaskExecutionFeedback packageFeedbackForHeartbeat() {
+
+ List<BlockSyncTaskExecutionFeedback> feedbackForHeartbeat;
+
+ synchronized (this) {
+ feedbackForHeartbeat = collectedFeedback;
+ collectedFeedback = Lists.newArrayList();
+ }
+ return new BulkSyncTaskExecutionFeedback(feedbackForHeartbeat);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcd1aaab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncOperationExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncOperationExecutor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncOperationExecutor.java
new file mode 100644
index 0000000..7fde230
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncOperationExecutor.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.syncservice.executor;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BBUploadHandle;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.MultipartUploader;
+import org.apache.hadoop.fs.MultipartUploaderFactory;
+import org.apache.hadoop.fs.PartHandle;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockInputStream;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockSyncTask;
+import org.apache.hadoop.hdfs.server.protocol.SyncTaskExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Vector;
+
+/**
+ * BlockSyncOperationExecutor writes the blocks to the sync service remote
+ * endpoint.
+ */
+public class BlockSyncOperationExecutor {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(BlockSyncOperationExecutor.class);
+
+ private Configuration conf;
+ private BiFunction<LocatedBlock, Configuration, BlockReader>
+ createBlockReader;
+ private Function<FileSystem, MultipartUploader> multipartUploaderSupplier;
+
+ @VisibleForTesting
+ BlockSyncOperationExecutor(Configuration conf,
+ BiFunction<LocatedBlock, Configuration, BlockReader> createBlockReader,
+ Function<FileSystem, MultipartUploader> multipartUploaderSupplier) {
+ this.conf = conf;
+ this.createBlockReader = createBlockReader;
+ this.multipartUploaderSupplier = multipartUploaderSupplier;
+ }
+
+ public static BlockSyncOperationExecutor createOnDataNode(Configuration conf,
+ BiFunction<LocatedBlock, Configuration, BlockReader> createBlockReader) {
+ return new BlockSyncOperationExecutor(conf,
+ createBlockReader,
+ fs -> {
+ try {
+ return MultipartUploaderFactory.get(fs, conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ public SyncTaskExecutionResult execute(BlockSyncTask blockSyncTask)
+ throws Exception {
+ LOG.info("Executing MetadataSyncTask {} (on {})",
+ blockSyncTask.getSyncTaskId(), blockSyncTask.getRemoteURI());
+
+ return doMultiPartPart(
+ blockSyncTask.getRemoteURI(),
+ blockSyncTask.getLocatedBlocks(),
+ blockSyncTask.getPartNumber(),
+ blockSyncTask.getUploadHandle(),
+ blockSyncTask.getOffset(),
+ blockSyncTask.getLength());
+ }
+
+ private SyncTaskExecutionResult doMultiPartPart(URI uri,
+ List<LocatedBlock> locatedBlocks, int partNumber, byte[] uploadHandle,
+ int offset, long length) throws IOException {
+ FileSystem fs = FileSystem.get(uri, conf);
+ Path filePath = new Path(uri);
+ Vector<InputStream> inputStreams = new Vector<>(locatedBlocks.size());
+ for (int i = 0; i < locatedBlocks.size(); ++i) {
+ LocatedBlock locatedBlock = locatedBlocks.get(i);
+ BlockReader reader = createBlockReader.apply(locatedBlock, conf);
+ if (i == 0) {
+ reader.skip(offset);
+ }
+ BlockInputStream inputStream = new BlockInputStream(reader);
+ inputStreams.add(inputStream);
+ }
+ Enumeration<InputStream> streamEnumeration = inputStreams.elements();
+ SequenceInputStream inputStream =
+ new SequenceInputStream(streamEnumeration);
+ MultipartUploader mpu = multipartUploaderSupplier.apply(fs);
+ PartHandle partHandle = mpu.putPart(filePath, inputStream,
+ partNumber, BBUploadHandle.from(ByteBuffer.wrap(uploadHandle)), length);
+ return new SyncTaskExecutionResult(partHandle.bytes(), length);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcd1aaab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncReaderFactory.java
new file mode 100644
index 0000000..cc5eb5c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncReaderFactory.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.syncservice.executor;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FsTracer;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.ClientContext;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.EnumSet;
+
+/**
+ * BlockSyncReaderFactory constructs a block reader in the Datanode for the
+ * Sync Command to read blocks that will be written to the synchronization
+ * remote endpoint.
+ */
+public class BlockSyncReaderFactory {
+
+ public static BlockReader createBlockReader(DataNode dataNode,
+ LocatedBlock locatedBlock, Configuration conf) throws IOException {
+ ClientContext clientContext = ClientContext.getFromConf(conf);
+ Token<BlockTokenIdentifier> accessToken = dataNode.getBlockAccessToken(
+ locatedBlock.getBlock(),
+ EnumSet.of(BlockTokenIdentifier.AccessMode.READ),
+ locatedBlock.getStorageTypes(), locatedBlock.getStorageIDs());
+
+ DatanodeInfo datanodeInfo = locatedBlock.getLocations()[0];
+
+ Socket socked = NetUtils.getDefaultSocketFactory(conf).createSocket();
+ InetSocketAddress resolvedAddress =
+ datanodeInfo.getResolvedAddress();
+ socked.connect(resolvedAddress);
+
+ return new BlockReaderFactory(new DfsClientConf(conf))
+ .setConfiguration(conf)
+ .setBlock(locatedBlock.getBlock())
+ .setBlockToken(accessToken)
+ .setStartOffset(0)
+ .setLength(locatedBlock.getBlock().getNumBytes())
+ .setInetSocketAddress(datanodeInfo.getResolvedAddress())
+ .setVerifyChecksum(true)
+ .setDatanodeInfo(datanodeInfo)
+ .setClientName("BlockSyncOperationExecutor")
+ .setCachingStrategy(CachingStrategy.newDefaultStrategy())
+ .setRemotePeerFactory((addr, blockToken, datanodeId) -> {
+ Peer peer = null;
+ Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
+ try {
+ sock.connect(addr, HdfsConstants.READ_TIMEOUT);
+ sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
+ peer = DFSUtilClient.peerFromSocket(sock);
+ } finally {
+ if (peer == null) {
+ IOUtils.closeQuietly(sock);
+ }
+ }
+ return peer;
+ })
+ .setClientCacheContext(clientContext)
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcd1aaab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncTaskRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncTaskRunner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncTaskRunner.java
new file mode 100644
index 0000000..660e39e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncTaskRunner.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.syncservice.executor;
+
+import org.apache.hadoop.hdfs.server.protocol.BlockSyncTask;
+import org.apache.hadoop.hdfs.server.protocol.BlockSyncTaskExecutionFeedback;
+import org.apache.hadoop.hdfs.server.protocol.SyncTaskExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BlockSyncTaskRunner glues together the sync task and the feedback reporting.
+ */
+import java.util.function.Consumer;
+
+public class BlockSyncTaskRunner implements Runnable {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(BlockSyncTaskRunner.class);
+
+ private BlockSyncTask blockSyncTask;
+ private BlockSyncOperationExecutor syncOperationExecutor;
+
+ private Consumer<BlockSyncTaskExecutionFeedback> publishOutcomeCallback;
+
+ public BlockSyncTaskRunner(BlockSyncTask blockSyncTask,
+ BlockSyncOperationExecutor syncOperationExecutor,
+ Consumer<BlockSyncTaskExecutionFeedback> publishOutcomeCallback) {
+ this.blockSyncTask = blockSyncTask;
+ this.syncOperationExecutor = syncOperationExecutor;
+ this.publishOutcomeCallback = publishOutcomeCallback;
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Executing BlockyncTask {} (on {})",
+ blockSyncTask.getSyncTaskId(), blockSyncTask.getRemoteURI());
+ try {
+ SyncTaskExecutionResult result =
+ syncOperationExecutor.execute(blockSyncTask);
+ publishOutcomeCallback.accept(BlockSyncTaskExecutionFeedback
+ .finishedSuccessfully(blockSyncTask.getSyncTaskId(),
+ blockSyncTask.getSyncMountId(),
+ result));
+ } catch (Exception e) {
+ LOG.error(
+ String.format("Exception executing MetadataSyncTask %s (on %s)",
+ blockSyncTask.getSyncTaskId(), blockSyncTask.getRemoteURI()), e);
+ publishOutcomeCallback.accept(BlockSyncTaskExecutionFeedback
+ .failedWithException(blockSyncTask.getSyncTaskId(),
+ blockSyncTask.getSyncMountId(), e));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcd1aaab/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java
index 96c5093..4c36017 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java
@@ -41,8 +41,7 @@ public class TestHDFSMultipartUploader
@BeforeClass
public static void init() throws IOException {
HdfsConfiguration conf = new HdfsConfiguration();
- cluster = new MiniDFSCluster.Builder(conf,
- GenericTestUtils.getRandomizedTestDir())
+ cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
.build();
cluster.waitClusterUp();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcd1aaab/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockInputStream.java
new file mode 100644
index 0000000..43d4881
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockInputStream.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test the BlockInputStream facade.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TestBlockInputStream {
+ @Mock
+ private BlockReader blockReaderMock;
+
+ @Test
+ public void testBlockInputStreamReadChar() {
+ BlockInputStream is = new BlockInputStream(blockReaderMock);
+
+ try {
+ when(blockReaderMock.read(any(), eq(0), eq(1)))
+ .thenReturn(32);
+ // Making the mock perform the side effect of writing to buf is nasty.
+ is.read();
+ verify(blockReaderMock, times(1)).read(any(), eq(0), eq(1));
+ } catch (IOException e) {
+ fail("Could not even mock out read function.");
+ }
+ }
+
+ @Test
+ public void testBlockInputStreamReadBuf() {
+ BlockInputStream is = new BlockInputStream(blockReaderMock);
+
+ try {
+ byte[] buf = new byte[1024];
+ when(blockReaderMock.read(buf, 0, buf.length)).thenReturn(1024);
+ is.read(buf, 0, buf.length);
+ verify(blockReaderMock, times(1)).read(buf, 0, buf.length);
+ } catch (IOException e) {
+ fail("Could not even mock out read function.");
+ }
+ }
+
+ @Test
+ public void testBlockInputStreamSkip() {
+ BlockInputStream is = new BlockInputStream(blockReaderMock);
+
+ try {
+ when(blockReaderMock.skip(10)).thenReturn(10L);
+ long ret = is.skip(10);
+ assertEquals(10, ret);
+ verify(blockReaderMock, times(1)).skip(10L);
+ } catch (IOException e) {
+ fail("Could not even mock out skip function.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcd1aaab/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/TestBlockSyncOperationExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/TestBlockSyncOperationExecutor.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/TestBlockSyncOperationExecutor.java
new file mode 100644
index 0000000..e16d086
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/TestBlockSyncOperationExecutor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.syncservice.executor;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BBPartHandle;
+import org.apache.hadoop.fs.MultipartUploader;
+import org.apache.hadoop.fs.PartHandle;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockSyncTask;
+import org.apache.hadoop.hdfs.server.protocol.SyncTaskExecutionResult;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestBlockSyncOperationExecutor {
+
+ @Mock
+ private BlockReader blockReaderMock;
+
+ @Mock
+ private MultipartUploader multipartUploaderMock;
+
+ @Test
+ public void executeMultipartPutFileSyncTask() throws Exception {
+ long blockLength = 42L;
+ Configuration conf = new Configuration();
+ BlockSyncOperationExecutor blockSyncOperationExecutor =
+ new BlockSyncOperationExecutor(conf,
+ (locatedBlock, config) -> blockReaderMock,
+ fs -> multipartUploaderMock);
+ String uploadHandleStr = "uploadHandle";
+ byte[] uploadHandle = uploadHandleStr.getBytes();
+ ByteBuffer byteBuffer = ByteBuffer.wrap(uploadHandle);
+ PartHandle partHandle = BBPartHandle.from(byteBuffer);
+ when(multipartUploaderMock.putPart(any(), any(), anyInt(), any(),
+ anyLong())).thenReturn(partHandle);
+ UUID syncTaskId = UUID.randomUUID();
+ URI remoteUri = new URI("remoteUri");
+ String syncMountId = "syncMountId";
+ Block block = new Block(42L, blockLength, 44L);
+ ExtendedBlock extendedBlock1 = new ExtendedBlock("poolId", block);
+ LocatedBlock locatedBlock = new LocatedBlock(extendedBlock1, null);
+ List<LocatedBlock> locatedBlocks = Lists.newArrayList(locatedBlock);
+ Integer partNumber = 85;
+ final int offset = 0;
+ final long length = locatedBlock.getBlockSize();
+
+
+ BlockSyncTask blockSyncTask = new BlockSyncTask(syncTaskId, remoteUri,
+ locatedBlocks, partNumber, uploadHandle, offset, length, syncMountId);
+
+ SyncTaskExecutionResult result =
+ blockSyncOperationExecutor.execute(blockSyncTask);
+
+ assertThat(result).isNotNull();
+ Long actualLength = result.getNumberOfBytes();
+ assertThat(actualLength).isEqualTo(blockLength);
+ assertThat(result.getResult()).isEqualTo(partHandle.bytes());
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org