You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ms...@apache.org on 2018/01/16 19:55:03 UTC
hadoop git commit: Revert "HDFS-12794. Ozone: Parallelize
ChunkOutputSream Writes to container. Contributed by Shashikant Banerjee."
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7240 6ce5ec676 -> 18f9fea7c
Revert "HDFS-12794. Ozone: Parallelize ChunkOutputSream Writes to container. Contributed by Shashikant Banerjee."
This reverts commit 6ce5ec676164b84a9e2f8dc65b5f2199a141506d.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/18f9fea7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/18f9fea7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/18f9fea7
Branch: refs/heads/HDFS-7240
Commit: 18f9fea7c42bbce0d6f6c3480ac1cd894261f358
Parents: 6ce5ec6
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Wed Jan 17 01:09:48 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Wed Jan 17 01:09:48 2018 +0530
----------------------------------------------------------------------
.../apache/hadoop/ozone/OzoneConfigKeys.java | 3 -
.../ozone/client/io/ChunkGroupOutputStream.java | 54 +---
.../hadoop/ozone/client/rpc/RpcClient.java | 10 -
.../hadoop/scm/storage/ChunkOutputStream.java | 257 ++++++-------------
.../scm/storage/ContainerProtocolCalls.java | 13 +-
.../web/storage/DistributedStorageHandler.java | 10 -
.../src/main/resources/ozone-default.xml | 9 -
.../hadoop/ozone/ksm/TestKeySpaceManager.java | 2 +-
8 files changed, 85 insertions(+), 273 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index cb3f0f6..8059b5e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -107,9 +107,6 @@ public final class OzoneConfigKeys {
"ozone.scm.block.size.in.mb";
public static final long OZONE_SCM_BLOCK_SIZE_DEFAULT = 256;
- public static final String OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB =
- "ozone.output.stream.buffer.size.in.mb";
- public static final long OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT = 256;
/**
* Ozone administrator users delimited by comma.
* If not set, only the user who launches an ozone service will be the
http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
index a44a009..fe248e3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
@@ -19,10 +19,7 @@ package org.apache.hadoop.ozone.client.io;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.primitives.Ints;
-import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
-import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
@@ -49,9 +46,6 @@ import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
- .OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT;
-
/**
* Maintaining a list of ChunkInputStream. Write based on offset.
*
@@ -78,7 +72,6 @@ public class ChunkGroupOutputStream extends OutputStream {
private final XceiverClientManager xceiverClientManager;
private final int chunkSize;
private final String requestID;
- private final long streamBufferSize;
/**
* A constructor for testing purpose only.
@@ -93,7 +86,6 @@ public class ChunkGroupOutputStream extends OutputStream {
xceiverClientManager = null;
chunkSize = 0;
requestID = null;
- streamBufferSize = OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT * OzoneConsts.MB;
}
/**
@@ -113,26 +105,12 @@ public class ChunkGroupOutputStream extends OutputStream {
return streamEntries;
}
- /**
- * Chunkoutput stream, making this package visible since this can be
- * created only via builder.
- * @param handler - Open Key state.
- * @param xceiverClientManager - Communication Manager.
- * @param scmClient - SCM protocol Client.
- * @param ksmClient - KSM Protocol client
- * @param chunkSize - Chunk Size - I/O
- * @param requestId - Seed for trace ID generation.
- * @param factor - Replication factor
- * @param type - Replication Type - RATIS/Standalone etc.
- * @param maxBufferSize - Maximum stream buffer Size.
- * @throws IOException - Throws this exception if there is an error.
- */
- ChunkGroupOutputStream(
+ public ChunkGroupOutputStream(
OpenKeySession handler, XceiverClientManager xceiverClientManager,
StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
KeySpaceManagerProtocolClientSideTranslatorPB ksmClient,
int chunkSize, String requestId, ReplicationFactor factor,
- ReplicationType type, long maxBufferSize) throws IOException {
+ ReplicationType type) throws IOException {
this.streamEntries = new ArrayList<>();
this.currentStreamIndex = 0;
this.byteOffset = 0;
@@ -152,7 +130,6 @@ public class ChunkGroupOutputStream extends OutputStream {
this.requestID = requestId;
LOG.debug("Expecting open key with one block, but got" +
info.getKeyLocationVersions().size());
- this.streamBufferSize = maxBufferSize;
}
/**
@@ -207,7 +184,7 @@ public class ChunkGroupOutputStream extends OutputStream {
}
streamEntries.add(new ChunkOutputStreamEntry(containerKey,
keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
- chunkSize, subKeyInfo.getLength(), this.streamBufferSize));
+ chunkSize, subKeyInfo.getLength()));
}
@@ -347,7 +324,6 @@ public class ChunkGroupOutputStream extends OutputStream {
private String requestID;
private ReplicationType type;
private ReplicationFactor factor;
- private long streamBufferSize;
public Builder setHandler(OpenKeySession handler) {
this.openHandler = handler;
@@ -391,23 +367,9 @@ public class ChunkGroupOutputStream extends OutputStream {
return this;
}
- public Builder setStreamBufferSize(long blockSize) {
- this.streamBufferSize = blockSize;
- return this;
- }
-
public ChunkGroupOutputStream build() throws IOException {
- Preconditions.checkNotNull(openHandler);
- Preconditions.checkNotNull(xceiverManager);
- Preconditions.checkNotNull(scmClient);
- Preconditions.checkNotNull(ksmClient);
- Preconditions.checkState(chunkSize > 0);
- Preconditions.checkState(StringUtils.isNotEmpty(requestID));
- Preconditions
- .checkState(streamBufferSize > 0 && streamBufferSize > chunkSize);
-
return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
- ksmClient, chunkSize, requestID, factor, type, streamBufferSize);
+ ksmClient, chunkSize, requestID, factor, type);
}
}
@@ -423,12 +385,11 @@ public class ChunkGroupOutputStream extends OutputStream {
private final long length;
// the current position of this stream 0 <= currentPosition < length
private long currentPosition;
- private long streamBufferSize; // Max block size.
ChunkOutputStreamEntry(String containerKey, String key,
XceiverClientManager xceiverClientManager,
XceiverClientSpi xceiverClient, String requestId, int chunkSize,
- long length, long streamBufferSize) {
+ long length) {
this.outputStream = null;
this.containerKey = containerKey;
this.key = key;
@@ -439,7 +400,6 @@ public class ChunkGroupOutputStream extends OutputStream {
this.length = length;
this.currentPosition = 0;
- this.streamBufferSize = streamBufferSize;
}
/**
@@ -458,8 +418,6 @@ public class ChunkGroupOutputStream extends OutputStream {
this.length = length;
this.currentPosition = 0;
- this.streamBufferSize =
- OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT * OzoneConsts.MB;
}
long getLength() {
@@ -474,7 +432,7 @@ public class ChunkGroupOutputStream extends OutputStream {
if (this.outputStream == null) {
this.outputStream = new ChunkOutputStream(containerKey,
key, xceiverClientManager, xceiverClient,
- requestId, chunkSize, Ints.checkedCast(streamBufferSize));
+ requestId, chunkSize);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 20f2b54..94038e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -74,11 +74,6 @@ import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
- .OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
- .OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT;
-
/**
* Ozone RPC Client Implementation, it connects to KSM, SCM and DataNode
* to execute client calls. This uses RPC protocol for communication
@@ -99,7 +94,6 @@ public class RpcClient implements ClientProtocol {
private final UserGroupInformation ugi;
private final OzoneAcl.OzoneACLRights userRights;
private final OzoneAcl.OzoneACLRights groupRights;
- private final long streamBufferSize;
/**
* Creates RpcClient instance with the given configuration.
@@ -154,9 +148,6 @@ public class RpcClient implements ClientProtocol {
} else {
chunkSize = configuredChunkSize;
}
- // streamBufferSize by default is set equal to default scm block size.
- streamBufferSize = conf.getLong(OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB,
- OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT) * OzoneConsts.MB;
}
@Override
@@ -472,7 +463,6 @@ public class RpcClient implements ClientProtocol {
.setRequestID(requestId)
.setType(OzoneProtos.ReplicationType.valueOf(type.toString()))
.setFactor(OzoneProtos.ReplicationFactor.valueOf(factor.getValue()))
- .setStreamBufferSize(streamBufferSize)
.build();
groupOutputStream.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java
index 916a506..64c10da 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java
@@ -18,32 +18,22 @@
package org.apache.hadoop.scm.storage;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue;
-import org.apache.hadoop.scm.XceiverClientManager;
-import org.apache.hadoop.scm.XceiverClientSpi;
-import org.apache.hadoop.util.Time;
+import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey;
+import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.writeChunk;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
- .Result.SUCCESS;
-import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey;
-import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.writeChunk;
+import com.google.protobuf.ByteString;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.XceiverClientSpi;
/**
* An {@link OutputStream} used by the REST service in combination with the
@@ -67,12 +57,12 @@ public class ChunkOutputStream extends OutputStream {
private final String key;
private final String traceID;
private final KeyData.Builder containerKeyData;
- private final String streamId;
private XceiverClientManager xceiverClientManager;
private XceiverClientSpi xceiverClient;
private ByteBuffer buffer;
+ private final String streamId;
+ private int chunkIndex;
private int chunkSize;
- private int streamBufferSize;
/**
* Creates a new ChunkOutputStream.
@@ -83,18 +73,14 @@ public class ChunkOutputStream extends OutputStream {
* @param xceiverClient client to perform container calls
* @param traceID container protocol call args
* @param chunkSize chunk size
- * @param maxBufferSize -- Controls the maximum amount of memory that we need
- * to allocate data buffering.
*/
public ChunkOutputStream(String containerKey, String key,
XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
- String traceID, int chunkSize, int maxBufferSize) {
+ String traceID, int chunkSize) {
this.containerKey = containerKey;
this.key = key;
this.traceID = traceID;
this.chunkSize = chunkSize;
- this.streamBufferSize = maxBufferSize;
-
KeyValue keyValue = KeyValue.newBuilder()
.setKey("TYPE").setValue("KEY").build();
this.containerKeyData = KeyData.newBuilder()
@@ -103,24 +89,22 @@ public class ChunkOutputStream extends OutputStream {
.addMetadata(keyValue);
this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClient;
- this.buffer = ByteBuffer.allocate(maxBufferSize);
+ this.buffer = ByteBuffer.allocate(chunkSize);
this.streamId = UUID.randomUUID().toString();
+ this.chunkIndex = 0;
}
- /**
- * {@inheritDoc}
- */
@Override
public synchronized void write(int b) throws IOException {
checkOpen();
- byte[] c = new byte[1];
- c[0] = (byte) b;
- write(c, 0, 1);
+ int rollbackPosition = buffer.position();
+ int rollbackLimit = buffer.limit();
+ buffer.put((byte)b);
+ if (buffer.position() == chunkSize) {
+ flushBufferToChunk(rollbackPosition, rollbackLimit);
+ }
}
- /**
- * {@inheritDoc}
- */
@Override
public void write(byte[] b, int off, int len) throws IOException {
if (b == null) {
@@ -134,90 +118,17 @@ public class ChunkOutputStream extends OutputStream {
return;
}
checkOpen();
- int rollbackPosition = buffer.position();
- int rollbackLimit = buffer.limit();
- try {
- List<ImmutablePair<CompletableFuture<ContainerProtos
- .ContainerCommandResponseProto>, ChunkInfo>>
- writeFutures = writeInParallel(b, off, len);
- // This is a rendezvous point for this function call, all chunk I/O
- // for this block must complete before we can declare this call as
- // complete.
-
- // Wait until all the futures complete or throws an exception if any of
- // the calls ended with an exception this call will throw.
- // if futures is null, it means that we wrote the data to the buffer and
- // returned.
- if (writeFutures != null) {
- CompletableFuture.allOf(writeFutures.toArray(new
- CompletableFuture[writeFutures.size()])).join();
-
- // Wrote this data, we will clear this buffer now.
- buffer.clear();
- }
- } catch (InterruptedException | ExecutionException e) {
- buffer.position(rollbackPosition);
- buffer.limit(rollbackLimit);
- throw new IOException("Unexpected error in write. ", e);
- }
- }
-
- /**
- * Write a given block into many small chunks in parallel.
- *
- * @param b
- * @param off
- * @param len
- * @throws IOException
- * @throws ExecutionException
- * @throws InterruptedException
- */
- public List<ImmutablePair<CompletableFuture<ContainerProtos
- .ContainerCommandResponseProto>, ChunkInfo>>
- writeInParallel(byte[] b, int off, int len)
- throws IOException, ExecutionException, InterruptedException {
-
- Preconditions.checkArgument(len <= streamBufferSize,
- "A chunk write cannot be " + "larger than max buffer size limit.");
- long newBlockCount = len / chunkSize;
- buffer.put(b, off, len);
- List<ImmutablePair<CompletableFuture<ContainerProtos
- .ContainerCommandResponseProto>, ChunkInfo>>
- writeFutures = new LinkedList<>();
-
- // We if must have at least a chunkSize of data ready to write, if so we
- // will go ahead and start writing that data.
- if (buffer.position() >= chunkSize) {
- // Allocate new byte slices which will point to each chunk of data
- // that we want to write. Divide the byte buffer into individual chunks
- // each of length equals to chunkSize max where each chunk will be
- // assigned a chunkId where, for each chunk the async write requests will
- // be made and wait for all of them to return before the write call
- // returns.
- for (int chunkId = 0; chunkId < newBlockCount; chunkId++) {
- // Please note : We are not flipping the slice when we write since
- // the slices are pointing the buffer start and end as needed for
- // the chunk write. Also please note, Duplicate does not create a
- // copy of data, it only creates metadata that points to the data
- // stream.
- ByteBuffer chunk = buffer.duplicate();
- Preconditions.checkState((chunkId * chunkSize) < buffer.limit(),
- "Chunk offset cannot be beyond the limits of the buffer.");
- chunk.position(chunkId * chunkSize);
- // Min handles the case where the last block might be lesser than
- // chunk Size.
- chunk.limit(chunk.position() +
- Math.min(chunkSize, chunk.remaining() - (chunkId * chunkSize)));
-
- // Schedule all the writes, this is a non-block call which returns
- // futures. We collect these futures and wait for all of them to
- // complete in the next line.
- writeFutures.add(writeChunkToContainer(chunk, 0, chunkSize));
+ while (len > 0) {
+ int writeLen = Math.min(chunkSize - buffer.position(), len);
+ int rollbackPosition = buffer.position();
+ int rollbackLimit = buffer.limit();
+ buffer.put(b, off, writeLen);
+ if (buffer.position() == chunkSize) {
+ flushBufferToChunk(rollbackPosition, rollbackLimit);
}
- return writeFutures;
+ off += writeLen;
+ len -= writeLen;
}
- // Nothing to do , return null.
- return null;
}
@Override
@@ -226,19 +137,7 @@ public class ChunkOutputStream extends OutputStream {
if (buffer.position() > 0) {
int rollbackPosition = buffer.position();
int rollbackLimit = buffer.limit();
- ByteBuffer chunk = buffer.duplicate();
- try {
-
- ImmutablePair<CompletableFuture<ContainerProtos
- .ContainerCommandResponseProto>, ChunkInfo>
- result = writeChunkToContainer(chunk, 0, chunkSize);
- updateChunkInfo(result);
- buffer.clear();
- } catch (ExecutionException | InterruptedException e) {
- buffer.position(rollbackPosition);
- buffer.limit(rollbackLimit);
- throw new IOException("Failure in flush", e);
- }
+ flushBufferToChunk(rollbackPosition, rollbackLimit);
}
}
@@ -248,20 +147,10 @@ public class ChunkOutputStream extends OutputStream {
buffer != null) {
try {
if (buffer.position() > 0) {
- // This flip is needed since this is the real buffer to which we
- // are writing and position will have moved each time we did a put.
- buffer.flip();
-
- // Call get immediately to make this call Synchronous.
-
- ImmutablePair<CompletableFuture<ContainerProtos
- .ContainerCommandResponseProto>, ChunkInfo>
- result = writeChunkToContainer(buffer, 0, buffer.limit());
- updateChunkInfo(result);
- buffer.clear();
+ writeChunkToContainer();
}
putKey(xceiverClient, containerKeyData.build(), traceID);
- } catch (IOException | InterruptedException | ExecutionException e) {
+ } catch (IOException e) {
throw new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e);
} finally {
@@ -274,24 +163,6 @@ public class ChunkOutputStream extends OutputStream {
}
- private void updateChunkInfo(
- ImmutablePair<
- CompletableFuture<ContainerProtos.ContainerCommandResponseProto>,
- ChunkInfo
- > result) throws InterruptedException, ExecutionException {
- // Wait for this call to complete.
- ContainerProtos.ContainerCommandResponseProto response =
- result.getLeft().get();
-
- // If the write call to the chunk is successful, we need to add that
- // chunk information to the containerKeyData.
- // TODO: Clean up the garbage in case of failure.
- if(response.getResult() == SUCCESS) {
- ChunkInfo chunk = result.getRight();
- containerKeyData.addChunks(chunk);
- }
- }
-
/**
* Checks if the stream is open. If not, throws an exception.
*
@@ -304,35 +175,53 @@ public class ChunkOutputStream extends OutputStream {
}
/**
+ * Attempts to flush buffered writes by writing a new chunk to the container.
+ * If successful, then clears the buffer to prepare to receive writes for a
+ * new chunk.
+ *
+ * @param rollbackPosition position to restore in buffer if write fails
+ * @param rollbackLimit limit to restore in buffer if write fails
+ * @throws IOException if there is an I/O error while performing the call
+ */
+ private synchronized void flushBufferToChunk(int rollbackPosition,
+ int rollbackLimit) throws IOException {
+ boolean success = false;
+ try {
+ writeChunkToContainer();
+ success = true;
+ } finally {
+ if (success) {
+ buffer.clear();
+ } else {
+ buffer.position(rollbackPosition);
+ buffer.limit(rollbackLimit);
+ }
+ }
+ }
+
+ /**
* Writes buffered data as a new chunk to the container and saves chunk
* information to be used later in putKey call.
*
- * @param data -- Data to write.
- * @param offset - offset to the data buffer
- * @param len - Length in bytes
- * @return Returns a Immutable pair -- A future object that will contian
- * the result of the operation, and the chunkInfo that we wrote.
- *
- * @throws IOException
- * @throws ExecutionException
- * @throws InterruptedException
+ * @throws IOException if there is an I/O error while performing the call
*/
- private ImmutablePair<
- CompletableFuture<ContainerProtos.ContainerCommandResponseProto>,
- ChunkInfo>
- writeChunkToContainer(ByteBuffer data, int offset, int len)
- throws IOException, ExecutionException, InterruptedException {
-
-
- ByteString dataString = ByteString.copyFrom(data);
- ChunkInfo chunk = ChunkInfo.newBuilder().setChunkName(
+ private synchronized void writeChunkToContainer() throws IOException {
+ buffer.flip();
+ ByteString data = ByteString.copyFrom(buffer);
+ ChunkInfo chunk = ChunkInfo
+ .newBuilder()
+ .setChunkName(
DigestUtils.md5Hex(key) + "_stream_"
- + streamId + "_chunk_" + Time.monotonicNowNanos())
+ + streamId + "_chunk_" + ++chunkIndex)
.setOffset(0)
- .setLen(len)
+ .setLen(data.size())
.build();
- CompletableFuture<ContainerProtos.ContainerCommandResponseProto> response =
- writeChunk(xceiverClient, chunk, key, dataString, traceID);
- return new ImmutablePair(response, chunk);
+ try {
+ writeChunk(xceiverClient, chunk, key, data, traceID);
+ } catch (IOException e) {
+ throw new IOException(
+ "Unexpected Storage Container Exception: " + e.toString(), e);
+ }
+ containerKeyData.addChunks(chunk);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java
index 7d4c72d..1cde67c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java
@@ -53,9 +53,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-
import org.apache.hadoop.scm.XceiverClientSpi;
/**
@@ -165,10 +162,9 @@ public final class ContainerProtocolCalls {
* @param traceID container protocol call args
* @throws IOException if there is an I/O error while performing the call
*/
- public static CompletableFuture<ContainerCommandResponseProto> writeChunk(
- XceiverClientSpi xceiverClient, ChunkInfo chunk, String key,
- ByteString data, String traceID)
- throws IOException, ExecutionException, InterruptedException {
+ public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk,
+ String key, ByteString data, String traceID)
+ throws IOException {
WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
.newBuilder()
.setPipeline(xceiverClient.getPipeline().getProtobufMessage())
@@ -183,7 +179,8 @@ public final class ContainerProtocolCalls {
.setDatanodeID(id)
.setWriteChunk(writeChunkRequest)
.build();
- return xceiverClient.sendCommandAsync(request);
+ ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
+ validateContainerResponse(response);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index 137f8f9..1830c71 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -67,11 +67,6 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
- .OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
- .OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT;
-
/**
* A {@link StorageHandler} implementation that distributes object storage
* across the nodes of an HDFS cluster.
@@ -91,7 +86,6 @@ public final class DistributedStorageHandler implements StorageHandler {
private final boolean useRatis;
private final OzoneProtos.ReplicationType type;
private final OzoneProtos.ReplicationFactor factor;
- private final long streamBufferSize;
/**
* Creates a new DistributedStorageHandler.
@@ -133,9 +127,6 @@ public final class DistributedStorageHandler implements StorageHandler {
chunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE);
chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE;
}
- // streamBufferSize by default is set to default scm block size.
- streamBufferSize = conf.getLong(OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB,
- OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT) * OzoneConsts.MB;
}
@Override
@@ -427,7 +418,6 @@ public final class DistributedStorageHandler implements StorageHandler {
.setRequestID(args.getRequestID())
.setType(xceiverClientManager.getType())
.setFactor(xceiverClientManager.getFactor())
- .setStreamBufferSize(streamBufferSize)
.build();
groupOutputStream.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
index 4df99f9..31c3901 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
@@ -691,15 +691,6 @@
</description>
</property>
<property>
- <name>ozone.output.stream.buffer.size.in.mb</name>
- <value>256</value>
- <tag>OZONE</tag>
- <description>
- The maximum size of the buffer allocated for the ozone output stream for
- write. Default size is equals to scm block size.
- </description>
- </property>
- <property>
<name>ozone.scm.chunk.size</name>
<value>16777216</value>
<tag>OZONE, SCM, CONTAINER, PERFORMANCE</tag>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
index fc4bedc..c8427f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
@@ -1114,7 +1114,7 @@ public class TestKeySpaceManager {
.getMetadataManager().getExpiredOpenKeys();
Assert.assertEquals(0, openKeys.size());
- //Thread.sleep(2000);
+ Thread.sleep(2000);
openKeys = cluster.getKeySpaceManager().getMetadataManager()
.getExpiredOpenKeys();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org