You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ms...@apache.org on 2018/01/16 19:55:03 UTC

hadoop git commit: Revert "HDFS-12794. Ozone: Parallelize ChunkOutputSream Writes to container. Contributed by Shashikant Banerjee."

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 6ce5ec676 -> 18f9fea7c


Revert "HDFS-12794. Ozone: Parallelize ChunkOutputSream Writes to container. Contributed by Shashikant Banerjee."

This reverts commit 6ce5ec676164b84a9e2f8dc65b5f2199a141506d.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/18f9fea7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/18f9fea7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/18f9fea7

Branch: refs/heads/HDFS-7240
Commit: 18f9fea7c42bbce0d6f6c3480ac1cd894261f358
Parents: 6ce5ec6
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Wed Jan 17 01:09:48 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Wed Jan 17 01:09:48 2018 +0530

----------------------------------------------------------------------
 .../apache/hadoop/ozone/OzoneConfigKeys.java    |   3 -
 .../ozone/client/io/ChunkGroupOutputStream.java |  54 +---
 .../hadoop/ozone/client/rpc/RpcClient.java      |  10 -
 .../hadoop/scm/storage/ChunkOutputStream.java   | 257 ++++++-------------
 .../scm/storage/ContainerProtocolCalls.java     |  13 +-
 .../web/storage/DistributedStorageHandler.java  |  10 -
 .../src/main/resources/ozone-default.xml        |   9 -
 .../hadoop/ozone/ksm/TestKeySpaceManager.java   |   2 +-
 8 files changed, 85 insertions(+), 273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index cb3f0f6..8059b5e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -107,9 +107,6 @@ public final class OzoneConfigKeys {
       "ozone.scm.block.size.in.mb";
   public static final long OZONE_SCM_BLOCK_SIZE_DEFAULT = 256;
 
-  public static final String OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB =
-      "ozone.output.stream.buffer.size.in.mb";
-  public static final long OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT = 256;
   /**
    * Ozone administrator users delimited by comma.
    * If not set, only the user who launches an ozone service will be the

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
index a44a009..fe248e3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
@@ -19,10 +19,7 @@ package org.apache.hadoop.ozone.client.io;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.primitives.Ints;
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
-import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
@@ -49,9 +46,6 @@ import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT;
-
 /**
  * Maintaining a list of ChunkInputStream. Write based on offset.
  *
@@ -78,7 +72,6 @@ public class ChunkGroupOutputStream extends OutputStream {
   private final XceiverClientManager xceiverClientManager;
   private final int chunkSize;
   private final String requestID;
-  private final long streamBufferSize;
 
   /**
    * A constructor for testing purpose only.
@@ -93,7 +86,6 @@ public class ChunkGroupOutputStream extends OutputStream {
     xceiverClientManager = null;
     chunkSize = 0;
     requestID = null;
-    streamBufferSize = OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT * OzoneConsts.MB;
   }
 
   /**
@@ -113,26 +105,12 @@ public class ChunkGroupOutputStream extends OutputStream {
     return streamEntries;
   }
 
-  /**
-   * Chunkoutput stream, making this package visible since this can be
-   * created only via builder.
-   * @param handler  - Open Key state.
-   * @param xceiverClientManager - Communication Manager.
-   * @param scmClient - SCM protocol Client.
-   * @param ksmClient - KSM Protocol client
-   * @param chunkSize - Chunk Size - I/O
-   * @param requestId - Seed for trace ID generation.
-   * @param factor - Replication factor
-   * @param type - Replication Type - RATIS/Standalone etc.
-   * @param maxBufferSize - Maximum stream buffer Size.
-   * @throws IOException - Throws this exception if there is an error.
-   */
-  ChunkGroupOutputStream(
+  public ChunkGroupOutputStream(
       OpenKeySession handler, XceiverClientManager xceiverClientManager,
       StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
       KeySpaceManagerProtocolClientSideTranslatorPB ksmClient,
       int chunkSize, String requestId, ReplicationFactor factor,
-      ReplicationType type, long maxBufferSize) throws IOException {
+      ReplicationType type) throws IOException {
     this.streamEntries = new ArrayList<>();
     this.currentStreamIndex = 0;
     this.byteOffset = 0;
@@ -152,7 +130,6 @@ public class ChunkGroupOutputStream extends OutputStream {
     this.requestID = requestId;
     LOG.debug("Expecting open key with one block, but got" +
         info.getKeyLocationVersions().size());
-    this.streamBufferSize = maxBufferSize;
   }
 
   /**
@@ -207,7 +184,7 @@ public class ChunkGroupOutputStream extends OutputStream {
     }
     streamEntries.add(new ChunkOutputStreamEntry(containerKey,
         keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
-        chunkSize, subKeyInfo.getLength(), this.streamBufferSize));
+        chunkSize, subKeyInfo.getLength()));
   }
 
 
@@ -347,7 +324,6 @@ public class ChunkGroupOutputStream extends OutputStream {
     private String requestID;
     private ReplicationType type;
     private ReplicationFactor factor;
-    private long streamBufferSize;
 
     public Builder setHandler(OpenKeySession handler) {
       this.openHandler = handler;
@@ -391,23 +367,9 @@ public class ChunkGroupOutputStream extends OutputStream {
       return this;
     }
 
-    public Builder setStreamBufferSize(long blockSize) {
-      this.streamBufferSize = blockSize;
-      return this;
-    }
-
     public ChunkGroupOutputStream build() throws IOException {
-      Preconditions.checkNotNull(openHandler);
-      Preconditions.checkNotNull(xceiverManager);
-      Preconditions.checkNotNull(scmClient);
-      Preconditions.checkNotNull(ksmClient);
-      Preconditions.checkState(chunkSize > 0);
-      Preconditions.checkState(StringUtils.isNotEmpty(requestID));
-      Preconditions
-          .checkState(streamBufferSize > 0 && streamBufferSize > chunkSize);
-
       return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
-          ksmClient, chunkSize, requestID, factor, type, streamBufferSize);
+          ksmClient, chunkSize, requestID, factor, type);
     }
   }
 
@@ -423,12 +385,11 @@ public class ChunkGroupOutputStream extends OutputStream {
     private final long length;
     // the current position of this stream 0 <= currentPosition < length
     private long currentPosition;
-    private long streamBufferSize; // Max block size.
 
     ChunkOutputStreamEntry(String containerKey, String key,
         XceiverClientManager xceiverClientManager,
         XceiverClientSpi xceiverClient, String requestId, int chunkSize,
-        long length, long streamBufferSize) {
+        long length) {
       this.outputStream = null;
       this.containerKey = containerKey;
       this.key = key;
@@ -439,7 +400,6 @@ public class ChunkGroupOutputStream extends OutputStream {
 
       this.length = length;
       this.currentPosition = 0;
-      this.streamBufferSize = streamBufferSize;
     }
 
     /**
@@ -458,8 +418,6 @@ public class ChunkGroupOutputStream extends OutputStream {
 
       this.length = length;
       this.currentPosition = 0;
-      this.streamBufferSize =
-          OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT * OzoneConsts.MB;
     }
 
     long getLength() {
@@ -474,7 +432,7 @@ public class ChunkGroupOutputStream extends OutputStream {
       if (this.outputStream == null) {
         this.outputStream = new ChunkOutputStream(containerKey,
             key, xceiverClientManager, xceiverClient,
-            requestId, chunkSize, Ints.checkedCast(streamBufferSize));
+            requestId, chunkSize);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 20f2b54..94038e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -74,11 +74,6 @@ import java.util.List;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT;
-
 /**
  * Ozone RPC Client Implementation, it connects to KSM, SCM and DataNode
  * to execute client calls. This uses RPC protocol for communication
@@ -99,7 +94,6 @@ public class RpcClient implements ClientProtocol {
   private final UserGroupInformation ugi;
   private final OzoneAcl.OzoneACLRights userRights;
   private final OzoneAcl.OzoneACLRights groupRights;
-  private final long streamBufferSize;
 
    /**
     * Creates RpcClient instance with the given configuration.
@@ -154,9 +148,6 @@ public class RpcClient implements ClientProtocol {
     } else {
       chunkSize = configuredChunkSize;
     }
-    // streamBufferSize by default is set equal to default scm block size.
-    streamBufferSize = conf.getLong(OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB,
-        OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT) * OzoneConsts.MB;
   }
 
   @Override
@@ -472,7 +463,6 @@ public class RpcClient implements ClientProtocol {
             .setRequestID(requestId)
             .setType(OzoneProtos.ReplicationType.valueOf(type.toString()))
             .setFactor(OzoneProtos.ReplicationFactor.valueOf(factor.getValue()))
-            .setStreamBufferSize(streamBufferSize)
             .build();
     groupOutputStream.addPreallocateBlocks(
         openKey.getKeyInfo().getLatestVersionLocations(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java
index 916a506..64c10da 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java
@@ -18,32 +18,22 @@
 
 package org.apache.hadoop.scm.storage;
 
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue;
-import org.apache.hadoop.scm.XceiverClientManager;
-import org.apache.hadoop.scm.XceiverClientSpi;
-import org.apache.hadoop.util.Time;
+import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey;
+import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.writeChunk;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 
-import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
-    .Result.SUCCESS;
-import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey;
-import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.writeChunk;
+import com.google.protobuf.ByteString;
 
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.XceiverClientSpi;
 
 /**
  * An {@link OutputStream} used by the REST service in combination with the
@@ -67,12 +57,12 @@ public class ChunkOutputStream extends OutputStream {
   private final String key;
   private final String traceID;
   private final KeyData.Builder containerKeyData;
-  private final String streamId;
   private XceiverClientManager xceiverClientManager;
   private XceiverClientSpi xceiverClient;
   private ByteBuffer buffer;
+  private final String streamId;
+  private int chunkIndex;
   private int chunkSize;
-  private int streamBufferSize;
 
   /**
    * Creates a new ChunkOutputStream.
@@ -83,18 +73,14 @@ public class ChunkOutputStream extends OutputStream {
    * @param xceiverClient client to perform container calls
    * @param traceID container protocol call args
    * @param chunkSize chunk size
-   * @param maxBufferSize -- Controls the maximum amount of memory that we need
-   * to allocate data buffering.
    */
   public ChunkOutputStream(String containerKey, String key,
       XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
-      String traceID, int chunkSize, int maxBufferSize) {
+      String traceID, int chunkSize) {
     this.containerKey = containerKey;
     this.key = key;
     this.traceID = traceID;
     this.chunkSize = chunkSize;
-    this.streamBufferSize = maxBufferSize;
-
     KeyValue keyValue = KeyValue.newBuilder()
         .setKey("TYPE").setValue("KEY").build();
     this.containerKeyData = KeyData.newBuilder()
@@ -103,24 +89,22 @@ public class ChunkOutputStream extends OutputStream {
         .addMetadata(keyValue);
     this.xceiverClientManager = xceiverClientManager;
     this.xceiverClient = xceiverClient;
-    this.buffer = ByteBuffer.allocate(maxBufferSize);
+    this.buffer = ByteBuffer.allocate(chunkSize);
     this.streamId = UUID.randomUUID().toString();
+    this.chunkIndex = 0;
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public synchronized void write(int b) throws IOException {
     checkOpen();
-    byte[] c = new byte[1];
-    c[0] = (byte) b;
-    write(c, 0, 1);
+    int rollbackPosition = buffer.position();
+    int rollbackLimit = buffer.limit();
+    buffer.put((byte)b);
+    if (buffer.position() == chunkSize) {
+      flushBufferToChunk(rollbackPosition, rollbackLimit);
+    }
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public void write(byte[] b, int off, int len) throws IOException {
     if (b == null) {
@@ -134,90 +118,17 @@ public class ChunkOutputStream extends OutputStream {
       return;
     }
     checkOpen();
-    int rollbackPosition = buffer.position();
-    int rollbackLimit = buffer.limit();
-    try {
-      List<ImmutablePair<CompletableFuture<ContainerProtos
-              .ContainerCommandResponseProto>, ChunkInfo>>
-          writeFutures = writeInParallel(b, off, len);
-      // This is a rendezvous point for this function call, all chunk I/O
-      // for this block must complete before we can declare this call as
-      // complete.
-
-      // Wait until all the futures complete or throws an exception if any of
-      // the calls ended with an exception this call will throw.
-      // if futures is null, it means that we wrote the data to the buffer and
-      // returned.
-      if (writeFutures != null) {
-        CompletableFuture.allOf(writeFutures.toArray(new
-            CompletableFuture[writeFutures.size()])).join();
-
-        // Wrote this data, we will clear this buffer now.
-        buffer.clear();
-      }
-    } catch (InterruptedException | ExecutionException e) {
-      buffer.position(rollbackPosition);
-      buffer.limit(rollbackLimit);
-      throw new IOException("Unexpected error in write. ", e);
-    }
-  }
-
-  /**
-   * Write a given block into many small chunks in parallel.
-   *
-   * @param b
-   * @param off
-   * @param len
-   * @throws IOException
-   * @throws ExecutionException
-   * @throws InterruptedException
-   */
-  public List<ImmutablePair<CompletableFuture<ContainerProtos
-          .ContainerCommandResponseProto>, ChunkInfo>>
-      writeInParallel(byte[] b, int off, int len)
-      throws IOException, ExecutionException, InterruptedException {
-
-    Preconditions.checkArgument(len <= streamBufferSize,
-        "A chunk write cannot be " + "larger than max buffer size limit.");
-    long newBlockCount = len / chunkSize;
-    buffer.put(b, off, len);
-    List<ImmutablePair<CompletableFuture<ContainerProtos
-            .ContainerCommandResponseProto>, ChunkInfo>>
-        writeFutures = new LinkedList<>();
-
-    // We if must have at least a chunkSize of data ready to write, if so we
-    // will go ahead and start writing that data.
-    if (buffer.position() >= chunkSize) {
-      // Allocate new byte slices which will point to each chunk of data
-      // that we want to write. Divide the byte buffer into individual chunks
-      // each of length equals to chunkSize max where each chunk will be
-      // assigned a chunkId where, for each chunk the async write requests will
-      // be made and wait for all of them to return before the write call
-      // returns.
-      for (int chunkId = 0; chunkId < newBlockCount; chunkId++) {
-        // Please note : We are not flipping the slice when we write since
-        // the slices are pointing the buffer start and end as needed for
-        // the chunk write. Also please note, Duplicate does not create a
-        // copy of data, it only creates metadata that points to the data
-        // stream.
-        ByteBuffer chunk = buffer.duplicate();
-        Preconditions.checkState((chunkId * chunkSize) < buffer.limit(),
-            "Chunk offset cannot be beyond the limits of the buffer.");
-        chunk.position(chunkId * chunkSize);
-        // Min handles the case where the last block might be lesser than
-        // chunk Size.
-        chunk.limit(chunk.position() +
-            Math.min(chunkSize, chunk.remaining() - (chunkId * chunkSize)));
-
-        // Schedule all the writes, this is a non-block call which returns
-        // futures. We collect these futures and wait for all  of them to
-        // complete in the next line.
-        writeFutures.add(writeChunkToContainer(chunk, 0, chunkSize));
+    while (len > 0) {
+      int writeLen = Math.min(chunkSize - buffer.position(), len);
+      int rollbackPosition = buffer.position();
+      int rollbackLimit = buffer.limit();
+      buffer.put(b, off, writeLen);
+      if (buffer.position() == chunkSize) {
+        flushBufferToChunk(rollbackPosition, rollbackLimit);
       }
-      return writeFutures;
+      off += writeLen;
+      len -= writeLen;
     }
-    // Nothing to do , return null.
-    return null;
   }
 
   @Override
@@ -226,19 +137,7 @@ public class ChunkOutputStream extends OutputStream {
     if (buffer.position() > 0) {
       int rollbackPosition = buffer.position();
       int rollbackLimit = buffer.limit();
-      ByteBuffer chunk = buffer.duplicate();
-      try {
-
-        ImmutablePair<CompletableFuture<ContainerProtos
-            .ContainerCommandResponseProto>, ChunkInfo>
-            result = writeChunkToContainer(chunk, 0, chunkSize);
-        updateChunkInfo(result);
-        buffer.clear();
-      } catch (ExecutionException | InterruptedException e) {
-        buffer.position(rollbackPosition);
-        buffer.limit(rollbackLimit);
-        throw new IOException("Failure in flush", e);
-      }
+      flushBufferToChunk(rollbackPosition, rollbackLimit);
     }
   }
 
@@ -248,20 +147,10 @@ public class ChunkOutputStream extends OutputStream {
         buffer != null) {
       try {
         if (buffer.position() > 0) {
-          // This flip is needed since this is the real buffer to which we
-          // are writing and position will have moved each time we did a put.
-          buffer.flip();
-
-          // Call get immediately to make this call Synchronous.
-
-          ImmutablePair<CompletableFuture<ContainerProtos
-              .ContainerCommandResponseProto>, ChunkInfo>
-              result = writeChunkToContainer(buffer, 0, buffer.limit());
-          updateChunkInfo(result);
-          buffer.clear();
+          writeChunkToContainer();
         }
         putKey(xceiverClient, containerKeyData.build(), traceID);
-      } catch (IOException | InterruptedException | ExecutionException e) {
+      } catch (IOException e) {
         throw new IOException(
             "Unexpected Storage Container Exception: " + e.toString(), e);
       } finally {
@@ -274,24 +163,6 @@ public class ChunkOutputStream extends OutputStream {
 
   }
 
-  private void updateChunkInfo(
-      ImmutablePair<
-          CompletableFuture<ContainerProtos.ContainerCommandResponseProto>,
-          ChunkInfo
-          > result) throws InterruptedException, ExecutionException {
-    // Wait for this call to complete.
-    ContainerProtos.ContainerCommandResponseProto response =
-        result.getLeft().get();
-
-    // If the write call to the chunk is successful, we need to add that
-    // chunk information to the containerKeyData.
-    // TODO: Clean up the garbage in case of failure.
-    if(response.getResult() == SUCCESS) {
-      ChunkInfo chunk = result.getRight();
-      containerKeyData.addChunks(chunk);
-    }
-  }
-
   /**
    * Checks if the stream is open.  If not, throws an exception.
    *
@@ -304,35 +175,53 @@ public class ChunkOutputStream extends OutputStream {
   }
 
   /**
+   * Attempts to flush buffered writes by writing a new chunk to the container.
+   * If successful, then clears the buffer to prepare to receive writes for a
+   * new chunk.
+   *
+   * @param rollbackPosition position to restore in buffer if write fails
+   * @param rollbackLimit limit to restore in buffer if write fails
+   * @throws IOException if there is an I/O error while performing the call
+   */
+  private synchronized void flushBufferToChunk(int rollbackPosition,
+      int rollbackLimit) throws IOException {
+    boolean success = false;
+    try {
+      writeChunkToContainer();
+      success = true;
+    } finally {
+      if (success) {
+        buffer.clear();
+      } else {
+        buffer.position(rollbackPosition);
+        buffer.limit(rollbackLimit);
+      }
+    }
+  }
+
+  /**
    * Writes buffered data as a new chunk to the container and saves chunk
    * information to be used later in putKey call.
    *
-   * @param data -- Data to write.
-   * @param offset - offset to the data buffer
-   * @param len - Length in bytes
-   * @return Returns a Immutable pair -- A future object that will contian
-   * the result of the operation, and the chunkInfo that we wrote.
-   *
-   * @throws IOException
-   * @throws ExecutionException
-   * @throws InterruptedException
+   * @throws IOException if there is an I/O error while performing the call
    */
-  private ImmutablePair<
-      CompletableFuture<ContainerProtos.ContainerCommandResponseProto>,
-      ChunkInfo>
-      writeChunkToContainer(ByteBuffer data, int offset, int len)
-      throws IOException, ExecutionException, InterruptedException {
-
-
-    ByteString dataString = ByteString.copyFrom(data);
-    ChunkInfo chunk = ChunkInfo.newBuilder().setChunkName(
+  private synchronized void writeChunkToContainer() throws IOException {
+    buffer.flip();
+    ByteString data = ByteString.copyFrom(buffer);
+    ChunkInfo chunk = ChunkInfo
+        .newBuilder()
+        .setChunkName(
             DigestUtils.md5Hex(key) + "_stream_"
-                + streamId + "_chunk_" + Time.monotonicNowNanos())
+                + streamId + "_chunk_" + ++chunkIndex)
         .setOffset(0)
-        .setLen(len)
+        .setLen(data.size())
         .build();
-    CompletableFuture<ContainerProtos.ContainerCommandResponseProto> response =
-        writeChunk(xceiverClient, chunk, key, dataString, traceID);
-    return new ImmutablePair(response, chunk);
+    try {
+      writeChunk(xceiverClient, chunk, key, data, traceID);
+    } catch (IOException e) {
+      throw new IOException(
+          "Unexpected Storage Container Exception: " + e.toString(), e);
+    }
+    containerKeyData.addChunks(chunk);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java
index 7d4c72d..1cde67c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java
@@ -53,9 +53,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue;
 import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
 
 import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-
 import org.apache.hadoop.scm.XceiverClientSpi;
 
 /**
@@ -165,10 +162,9 @@ public final class ContainerProtocolCalls  {
    * @param traceID container protocol call args
    * @throws IOException if there is an I/O error while performing the call
    */
-  public static CompletableFuture<ContainerCommandResponseProto> writeChunk(
-      XceiverClientSpi xceiverClient, ChunkInfo chunk, String key,
-      ByteString data, String traceID)
-      throws IOException, ExecutionException, InterruptedException {
+  public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk,
+      String key, ByteString data, String traceID)
+      throws IOException {
     WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
         .newBuilder()
         .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
@@ -183,7 +179,8 @@ public final class ContainerProtocolCalls  {
         .setDatanodeID(id)
         .setWriteChunk(writeChunkRequest)
         .build();
-    return xceiverClient.sendCommandAsync(request);
+    ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
+    validateContainerResponse(response);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index 137f8f9..1830c71 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -67,11 +67,6 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.List;
 
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT;
-
 /**
  * A {@link StorageHandler} implementation that distributes object storage
  * across the nodes of an HDFS cluster.
@@ -91,7 +86,6 @@ public final class DistributedStorageHandler implements StorageHandler {
   private final boolean useRatis;
   private final OzoneProtos.ReplicationType type;
   private final OzoneProtos.ReplicationFactor factor;
-  private final long streamBufferSize;
 
   /**
    * Creates a new DistributedStorageHandler.
@@ -133,9 +127,6 @@ public final class DistributedStorageHandler implements StorageHandler {
           chunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE);
       chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE;
     }
-    // streamBufferSize by default is set to default scm block size.
-    streamBufferSize = conf.getLong(OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB,
-        OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT) * OzoneConsts.MB;
   }
 
   @Override
@@ -427,7 +418,6 @@ public final class DistributedStorageHandler implements StorageHandler {
             .setRequestID(args.getRequestID())
             .setType(xceiverClientManager.getType())
             .setFactor(xceiverClientManager.getFactor())
-            .setStreamBufferSize(streamBufferSize)
             .build();
     groupOutputStream.addPreallocateBlocks(
         openKey.getKeyInfo().getLatestVersionLocations(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
index 4df99f9..31c3901 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
@@ -691,15 +691,6 @@
     </description>
   </property>
   <property>
-    <name>ozone.output.stream.buffer.size.in.mb</name>
-    <value>256</value>
-    <tag>OZONE</tag>
-    <description>
-      The maximum size of the buffer allocated for the ozone output stream for
-      write. Default size is equals to scm block size.
-    </description>
-  </property>
-  <property>
     <name>ozone.scm.chunk.size</name>
     <value>16777216</value>
     <tag>OZONE, SCM, CONTAINER, PERFORMANCE</tag>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
index fc4bedc..c8427f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
@@ -1114,7 +1114,7 @@ public class TestKeySpaceManager {
           .getMetadataManager().getExpiredOpenKeys();
       Assert.assertEquals(0, openKeys.size());
 
-      //Thread.sleep(2000);
+      Thread.sleep(2000);
 
       openKeys = cluster.getKeySpaceManager().getMetadataManager()
           .getExpiredOpenKeys();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org