You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2017/05/15 17:35:37 UTC

hadoop git commit: HDFS-11694. Block Storage: Add Support for 2 BlockIDBuffers and also for periodic flush of BlockIDBuffer. Contributed by Mukul Kumar Singh

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 055e556e6 -> e40e09540


HDFS-11694. Block Storage: Add Support for 2 BlockIDBuffers and also for periodic flush of BlockIDBuffer. Contributed by Mukul Kumar Singh


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e40e0954
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e40e0954
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e40e0954

Branch: refs/heads/HDFS-7240
Commit: e40e09540b83f97b3aab44e6cf61c89ed437a36c
Parents: 055e556
Author: Chen Liang <cl...@apache.org>
Authored: Mon May 15 10:35:44 2017 -0700
Committer: Chen Liang <cl...@apache.org>
Committed: Mon May 15 10:35:44 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/cblock/CBlockConfigKeys.java  |   5 +
 .../cblock/jscsiHelper/CBlockTargetMetrics.java |  72 +++-
 .../jscsiHelper/ContainerCacheFlusher.java      |   2 +
 .../cache/impl/AsyncBlockWriter.java            |  89 +----
 .../cache/impl/BlockBufferFlushTask.java        | 118 ++++++
 .../cache/impl/BlockBufferManager.java          | 182 +++++++++
 .../apache/hadoop/cblock/TestBufferManager.java | 394 +++++++++++++++++++
 .../hadoop/cblock/TestLocalBlockCache.java      | 107 +----
 8 files changed, 769 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e40e0954/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
index 74f5dc6..c7222ca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
@@ -132,6 +132,11 @@ public final class CBlockConfigKeys {
       "dfs.cblock.cache.block.buffer.size";
   public static final int DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT = 512;
 
+  public static final String DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS =
+      "dfs.cblock.block.buffer.flush.interval.seconds";
+  public static final int
+      DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS_DEFAULT = 60;
+
   // jscsi server settings
   public static final String DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY =
       "dfs.cblock.jscsi.server.address";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e40e0954/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java
index 1174c33..5efb461 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java
@@ -39,14 +39,15 @@ public class CBlockTargetMetrics {
   @Metric private MutableCounterLong numWriteOps;
   @Metric private MutableCounterLong numReadCacheHits;
   @Metric private MutableCounterLong numReadCacheMiss;
+  @Metric private MutableCounterLong numDirectBlockWrites;
 
   // Cblock internal Metrics
-  @Metric private MutableCounterLong numDirectBlockWrites;
-  @Metric private MutableCounterLong numBlockBufferFlush;
   @Metric private MutableCounterLong numDirtyLogBlockRead;
-  @Metric private MutableCounterLong numDirtyLogBlockUpdated;
   @Metric private MutableCounterLong numBytesDirtyLogRead;
   @Metric private MutableCounterLong numBytesDirtyLogWritten;
+  @Metric private MutableCounterLong numBlockBufferFlushCompleted;
+  @Metric private MutableCounterLong numBlockBufferFlushTriggered;
+  @Metric private MutableCounterLong numBlockBufferUpdates;
 
   // Failure Metrics
   @Metric private MutableCounterLong numReadLostBlocks;
@@ -54,7 +55,10 @@ public class CBlockTargetMetrics {
   @Metric private MutableCounterLong numWriteIOExceptionRetryBlocks;
   @Metric private MutableCounterLong numWriteGenericExceptionRetryBlocks;
   @Metric private MutableCounterLong numFailedDirectBlockWrites;
-  @Metric private MutableCounterLong numFailedDirtyBlockFlushes;
+  @Metric private MutableCounterLong numIllegalDirtyLogFiles;
+  @Metric private MutableCounterLong numFailedDirtyLogFileDeletes;
+  @Metric private MutableCounterLong numFailedBlockBufferFlushes;
+  @Metric private MutableCounterLong numInterruptedBufferWaits;
 
   // Latency based Metrics
   @Metric private MutableRate dbReadLatency;
@@ -114,8 +118,12 @@ public class CBlockTargetMetrics {
     numFailedReadBlocks.incr();
   }
 
-  public void incNumBlockBufferFlush() {
-    numBlockBufferFlush.incr();
+  public void incNumBlockBufferFlushCompleted() {
+    numBlockBufferFlushCompleted.incr();
+  }
+
+  public void incNumBlockBufferFlushTriggered() {
+    numBlockBufferFlushTriggered.incr();
   }
 
   public void incNumDirtyLogBlockRead() {
@@ -126,16 +134,28 @@ public class CBlockTargetMetrics {
     numBytesDirtyLogRead.incr(bytes);
   }
 
-  public void incNumDirtyLogBlockUpdated() {
-    numDirtyLogBlockUpdated.incr();
+  public void incNumBlockBufferUpdates() {
+    numBlockBufferUpdates.incr();
   }
 
   public void incNumBytesDirtyLogWritten(int bytes) {
     numBytesDirtyLogWritten.incr(bytes);
   }
 
-  public void incNumFailedDirtyBlockFlushes() {
-    numFailedDirtyBlockFlushes.incr();
+  public void incNumFailedBlockBufferFlushes() {
+    numFailedBlockBufferFlushes.incr();
+  }
+
+  public void incNumInterruptedBufferWaits() {
+    numInterruptedBufferWaits.incr();
+  }
+
+  public void incNumIllegalDirtyLogFiles() {
+    numIllegalDirtyLogFiles.incr();
+  }
+
+  public void incNumFailedDirtyLogFileDeletes() {
+    numFailedDirtyLogFileDeletes.incr();
   }
 
   public void updateDBReadLatency(long latency) {
@@ -213,8 +233,13 @@ public class CBlockTargetMetrics {
   }
 
   @VisibleForTesting
-  public long getNumBlockBufferFlush() {
-    return numBlockBufferFlush.value();
+  public long getNumBlockBufferFlushCompleted() {
+    return numBlockBufferFlushCompleted.value();
+  }
+
+  @VisibleForTesting
+  public long getNumBlockBufferFlushTriggered() {
+    return numBlockBufferFlushTriggered.value();
   }
 
   @VisibleForTesting
@@ -228,8 +253,8 @@ public class CBlockTargetMetrics {
   }
 
   @VisibleForTesting
-  public long getNumDirtyLogBlockUpdated() {
-    return numDirtyLogBlockUpdated.value();
+  public long getNumBlockBufferUpdates() {
+    return numBlockBufferUpdates.value();
   }
 
   @VisibleForTesting
@@ -238,7 +263,22 @@ public class CBlockTargetMetrics {
   }
 
   @VisibleForTesting
-  public long getNumFailedDirtyBlockFlushes() {
-    return numFailedDirtyBlockFlushes.value();
+  public long getNumFailedBlockBufferFlushes() {
+    return numFailedBlockBufferFlushes.value();
+  }
+
+  @VisibleForTesting
+  public long getNumInterruptedBufferWaits() {
+    return numInterruptedBufferWaits.value();
+  }
+
+  @VisibleForTesting
+  public long getNumIllegalDirtyLogFiles() {
+    return numIllegalDirtyLogFiles.value();
+  }
+
+  @VisibleForTesting
+  public long getNumFailedDirtyLogFileDeletes() {
+    return numFailedDirtyLogFileDeletes.value();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e40e0954/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java
index 905d9ba..c796f73 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java
@@ -363,6 +363,7 @@ public class ContainerCacheFlusher implements Runnable {
         if (finishCountMap.containsKey(message.getFileName())) {
           // In theory this should never happen. But if it happened,
           // we need to know it...
+          getTargetMetrics().incNumIllegalDirtyLogFiles();
           LOG.error("Adding DirtyLog file again {} current count {} new {}",
               message.getFileName(),
               finishCountMap.get(message.getFileName()).expectedCount,
@@ -516,6 +517,7 @@ public class ContainerCacheFlusher implements Runnable {
           }*/
           fileDeleted.set(true);
         } catch (Exception e) {
+          flusher.getTargetMetrics().incNumFailedDirtyLogFileDeletes();
           LOG.error("Error deleting dirty log file:" + filePath, e);
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e40e0954/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
index 1273cd2..d8e9839 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
@@ -32,19 +32,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.Paths;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
-import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT;
-
 /**
  * A Queue that is used to write blocks asynchronously to the container.
  */
@@ -53,12 +46,6 @@ public class AsyncBlockWriter {
       LoggerFactory.getLogger(AsyncBlockWriter.class);
 
   /**
-   * Right now we have a single buffer and we block when we write it to
-   * the file.
-   */
-  private final ByteBuffer blockIDBuffer;
-
-  /**
    * XceiverClientManager is used to get client connections to a set of
    * machines.
    */
@@ -80,8 +67,8 @@ public class AsyncBlockWriter {
    * The cache this writer is operating against.
    */
   private final CBlockLocalCache parentCache;
-  private final int blockBufferSize;
-  private final static String DIRTY_LOG_PREFIX = "DirtyLog";
+  private final BlockBufferManager blockBufferManager;
+  public final static String DIRTY_LOG_PREFIX = "DirtyLog";
   private AtomicLong localIoCount;
 
   /**
@@ -95,14 +82,11 @@ public class AsyncBlockWriter {
     Preconditions.checkNotNull(cache, "Cache cannot be null.");
     Preconditions.checkNotNull(cache.getCacheDB(), "DB cannot be null.");
     localIoCount = new AtomicLong();
-    blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
-        DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE);
-    LOG.info("Cache: Block Size: {}", blockBufferSize);
     lock = new ReentrantLock();
     notEmpty = lock.newCondition();
     parentCache = cache;
     xceiverClientManager = cache.getClientManager();
-    blockIDBuffer = ByteBuffer.allocateDirect(blockBufferSize);
+    blockBufferManager = new BlockBufferManager(config, parentCache);
   }
 
   public void start() throws IOException {
@@ -113,6 +97,7 @@ public class AsyncBlockWriter {
       throw new IllegalStateException("Cache Directory create failed, Cannot " +
           "continue. Log Dir: {}" + logDir);
     }
+    blockBufferManager.start();
   }
 
   /**
@@ -179,11 +164,7 @@ public class AsyncBlockWriter {
             block.getBlockID(), endTime - startTime, datahash);
       }
       block.clearData();
-      parentCache.getTargetMetrics().incNumDirtyLogBlockUpdated();
-      blockIDBuffer.putLong(block.getBlockID());
-      if (blockIDBuffer.remaining() == 0) {
-        writeBlockBufferToFile(blockIDBuffer);
-      }
+      blockBufferManager.addToBlockBuffer(block.getBlockID());
     } else {
       Pipeline pipeline = parentCache.getPipeline(block.getBlockID());
       String containerName = pipeline.getContainerName();
@@ -216,68 +197,10 @@ public class AsyncBlockWriter {
   }
 
   /**
-   * Write Block Buffer to file.
-   *
-   * @param blockBuffer - ByteBuffer
-   * @throws IOException
-   */
-  private synchronized void writeBlockBufferToFile(ByteBuffer blockBuffer)
-      throws IOException {
-    long startTime = Time.monotonicNow();
-    boolean append = false;
-    int bytesWritten = 0;
-
-    // If there is nothing written to blockId buffer,
-    // then skip flushing of blockId buffer
-    if (blockBuffer.position() == 0) {
-      return;
-    }
-
-    blockBuffer.flip();
-    String fileName =
-        String.format("%s.%s", DIRTY_LOG_PREFIX, Time.monotonicNow());
-    String log = Paths.get(parentCache.getDbPath().toString(), fileName)
-        .toString();
-
-    try {
-      FileChannel channel = new FileOutputStream(log, append).getChannel();
-      bytesWritten = channel.write(blockBuffer);
-    } catch (Exception ex) {
-      LOG.error("Unable to sync the Block map to disk -- This might cause a " +
-          "data loss or corruption", ex);
-      parentCache.getTargetMetrics().incNumFailedDirtyBlockFlushes();
-      throw ex;
-    } finally {
-      blockBuffer.clear();
-    }
-
-    parentCache.processDirtyMessage(fileName);
-    blockIDBuffer.clear();
-    long endTime = Time.monotonicNow();
-    if (parentCache.isTraceEnabled()) {
-      parentCache.getTracer().info(
-          "Task=DirtyBlockLogWrite,Time={} bytesWritten={}",
-          endTime - startTime, bytesWritten);
-    }
-
-    parentCache.getTargetMetrics().incNumBytesDirtyLogWritten(bytesWritten);
-    parentCache.getTargetMetrics().incNumBlockBufferFlush();
-    parentCache.getTargetMetrics()
-        .updateBlockBufferFlushLatency(endTime - startTime);
-    LOG.debug("Block buffer writer bytesWritten:{} Time:{}",
-        bytesWritten, endTime - startTime);
-  }
-
-  /**
    * Shutdown by writing any pending I/O to dirtylog buffer.
    */
   public void shutdown() {
-    try {
-      writeBlockBufferToFile(this.blockIDBuffer);
-    } catch (IOException e) {
-      LOG.error("Unable to sync the Block map to disk -- This might cause a " +
-          "data loss or corruption");
-    }
+    blockBufferManager.shutdown();
   }
   /**
    * Returns tracer.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e40e0954/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java
new file mode 100644
index 0000000..c61a7a4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
+
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Paths;
+
+/**
+ * This task is responsible for flushing the BlockIDBuffer
+ * to Dirty Log File. This Dirty Log file is used later by
+ * ContainerCacheFlusher when the data is written to container
+ */
+public class BlockBufferFlushTask implements Runnable {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BlockBufferFlushTask.class);
+  private final CBlockLocalCache parentCache;
+  private final BlockBufferManager bufferManager;
+  private final ByteBuffer blockIDBuffer;
+
+  BlockBufferFlushTask(ByteBuffer blockIDBuffer, CBlockLocalCache parentCache,
+                       BlockBufferManager manager) {
+    this.parentCache = parentCache;
+    this.bufferManager = manager;
+    this.blockIDBuffer = blockIDBuffer;
+  }
+
+  /**
+   * When an object implementing interface <code>Runnable</code> is used
+   * to create a thread, starting the thread causes the object's
+   * <code>run</code> method to be called in that separately executing
+   * thread.
+   * <p>
+   * The general contract of the method <code>run</code> is that it may
+   * take any action whatsoever.
+   *
+   * @see Thread#run()
+   */
+  @Override
+  public void run() {
+    try {
+      writeBlockBufferToFile(blockIDBuffer);
+    } catch (Exception e) {
+      parentCache.getTargetMetrics().incNumFailedBlockBufferFlushes();
+      LOG.error("Unable to sync the Block map to disk with "
+          + (blockIDBuffer.position() / Long.SIZE) + "entries "
+          + "-- NOTE: This might cause a data loss or corruption", e);
+    } finally {
+      bufferManager.releaseBuffer(blockIDBuffer);
+    }
+  }
+
+  /**
+   * Write Block Buffer to file.
+   *
+   * @param buffer - ByteBuffer
+   * @throws IOException
+   */
+  private void writeBlockBufferToFile(ByteBuffer buffer)
+      throws IOException {
+    long startTime = Time.monotonicNow();
+    boolean append = false;
+
+    // If there is nothing written to blockId buffer,
+    // then skip flushing of blockId buffer
+    if (buffer.position() == 0) {
+      return;
+    }
+
+    buffer.flip();
+    String fileName =
+        String.format("%s.%s", AsyncBlockWriter.DIRTY_LOG_PREFIX,
+                      Time.monotonicNow());
+    String log = Paths.get(parentCache.getDbPath().toString(), fileName)
+        .toString();
+
+    FileChannel channel = new FileOutputStream(log, append).getChannel();
+    int bytesWritten = channel.write(buffer);
+    channel.close();
+    buffer.clear();
+    parentCache.processDirtyMessage(fileName);
+    long endTime = Time.monotonicNow();
+    if (parentCache.isTraceEnabled()) {
+      parentCache.getTracer().info(
+          "Task=DirtyBlockLogWrite,Time={} bytesWritten={}",
+          endTime - startTime, bytesWritten);
+    }
+
+    parentCache.getTargetMetrics().incNumBlockBufferFlushCompleted();
+    parentCache.getTargetMetrics().incNumBytesDirtyLogWritten(bytesWritten);
+    parentCache.getTargetMetrics().
+        updateBlockBufferFlushLatency(endTime - startTime);
+    LOG.debug("Block buffer writer bytesWritten:{} Time:{}",
+        bytesWritten, endTime - startTime);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e40e0954/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java
new file mode 100644
index 0000000..7e3aed1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_CACHE_THREAD_PRIORITY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT;
+
+/**
+ * This class manages the block ID buffer.
+ * Block ID Buffer keeps a list of blocks which are in leveldb cache
+ * This buffer is used later when the blocks are flushed to container
+ *
+ * Two blockIDBuffers are maintained so that write are not blocked when
+ * DirtyLog is being written. Once a blockIDBuffer is full, it will be
+ * enqueued for DirtyLog write while the other buffer accepts new write.
+ * Once the DirtyLog write is done, the buffer is returned back to the pool.
+ *
+ * There are three triggers for blockIDBuffer flush
+ * 1) BlockIDBuffer is full,
+ * 2) Time period defined for blockIDBuffer flush has elapsed.
+ * 3) Shutdown
+ */
+public class BlockBufferManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BlockBufferManager.class);
+
+  private enum FlushReason {
+    BUFFER_FULL,
+    SHUTDOWN,
+    TIMER
+  };
+
+  private final int blockBufferSize;
+  private final CBlockLocalCache parentCache;
+  private final ScheduledThreadPoolExecutor scheduledExecutor;
+  private final ThreadPoolExecutor threadPoolExecutor;
+  private final int intervalSeconds;
+  private final ArrayBlockingQueue<ByteBuffer> acquireQueue;
+  private final ArrayBlockingQueue<Runnable> workQueue;
+  private ByteBuffer currentBuffer;
+
+  BlockBufferManager(Configuration config, CBlockLocalCache parentCache) {
+    this.parentCache = parentCache;
+    this.scheduledExecutor = new ScheduledThreadPoolExecutor(1);
+
+    this.intervalSeconds =
+        config.getInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS,
+            DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS_DEFAULT);
+
+    long keepAlive = config.getLong(DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS,
+        DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS_DEFAULT);
+    this.workQueue = new ArrayBlockingQueue<>(2, true);
+    int threadPri = config.getInt(DFS_CBLOCK_CACHE_THREAD_PRIORITY,
+        DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT);
+    ThreadFactory workerThreadFactory = new ThreadFactoryBuilder()
+        .setNameFormat("Cache Block Buffer Manager Thread #%d")
+        .setDaemon(true)
+        .setPriority(threadPri)
+        .build();
+    /*
+     * starting a thread pool with core pool size of 1 and maximum of 2 threads
+     * as there are maximum of 2 buffers which can be flushed at the same time.
+     */
+    this.threadPoolExecutor = new ThreadPoolExecutor(1, 2,
+        keepAlive, TimeUnit.SECONDS, workQueue, workerThreadFactory,
+        new ThreadPoolExecutor.AbortPolicy());
+
+    this.blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
+        DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE);
+    this.acquireQueue = new ArrayBlockingQueue<>(2, true);
+
+    for (int i = 0; i < 2; i++) {
+      acquireQueue.add(ByteBuffer.allocate(blockBufferSize));
+    }
+    // get the first buffer to be used
+    this.currentBuffer = acquireQueue.remove();
+
+    LOG.info("BufferManager: Buffer Size:{} FlushIntervalSeconds:{}",
+        blockBufferSize, intervalSeconds);
+  }
+
+  // triggerBlockBufferFlush enqueues current ByteBuffer for flush and returns.
+  // This enqueue is asynchronous and hence triggerBlockBufferFlush will
+  // only block when there are no available buffers in acquireQueue
+  // Once the DirtyLog write is done, buffer is returned back to
+  // BlockBufferManager using releaseBuffer
+  private synchronized void triggerBlockBufferFlush(FlushReason reason) {
+    LOG.debug("Flush triggered because: " + reason.toString() +
+        " Num entries in buffer: " +
+        currentBuffer.position() / (Long.SIZE / Byte.SIZE) +
+        " Acquire Queue Size: " + acquireQueue.size());
+
+    parentCache.getTargetMetrics().incNumBlockBufferFlushTriggered();
+    BlockBufferFlushTask flushTask =
+        new BlockBufferFlushTask(currentBuffer, parentCache, this);
+    threadPoolExecutor.submit(flushTask);
+    try {
+      currentBuffer = acquireQueue.take();
+    } catch (InterruptedException ex) {
+      currentBuffer = null;
+      parentCache.getTargetMetrics().incNumInterruptedBufferWaits();
+      LOG.error("wait on take operation on acquire queue interrupted", ex);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  public synchronized void addToBlockBuffer(long blockId)  {
+    parentCache.getTargetMetrics().incNumBlockBufferUpdates();
+    currentBuffer.putLong(blockId);
+    // if no space left, flush this buffer
+    if (currentBuffer.remaining() == 0) {
+      triggerBlockBufferFlush(FlushReason.BUFFER_FULL);
+    }
+  }
+
+  public void releaseBuffer(ByteBuffer buffer) {
+    if (buffer.position() != 0) {
+      LOG.error("requeuing a non empty buffer with:{}",
+          "elements enqueued in the acquire queue",
+          buffer.position() / (Long.SIZE / Byte.SIZE));
+      buffer.reset();
+    }
+    // There should always be space in the queue to add an element
+    acquireQueue.add(buffer);
+  }
+
+  // Start a scheduled task to flush blockIDBuffer
+  public void start() {
+    Runnable scheduledTask = () -> triggerBlockBufferFlush(FlushReason.TIMER);
+    scheduledExecutor.scheduleWithFixedDelay(scheduledTask, intervalSeconds,
+                                        intervalSeconds, TimeUnit.SECONDS);
+    threadPoolExecutor.prestartAllCoreThreads();
+  }
+
+  public void shutdown() {
+    triggerBlockBufferFlush(FlushReason.SHUTDOWN);
+    scheduledExecutor.shutdown();
+    threadPoolExecutor.shutdown();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e40e0954/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java
new file mode 100644
index 0000000..b488cbd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.cblock;
+
+import com.google.common.primitives.Longs;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
+import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
+import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.TimeoutException;
+
+
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_DISK_CACHE_PATH_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_TRACE_IO;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.
+    DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS;
+
+/**
+ * Tests for Tests for local cache.
+ */
+public class TestBufferManager {
+  private final static long GB = 1024 * 1024 * 1024;
+  private final static int KB = 1024;
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration config;
+  private static StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocationClient;
+  private static XceiverClientManager xceiverClientManager;
+
+  @BeforeClass
+  public static void init() throws IOException {
+    config = new OzoneConfiguration();
+    File p = GenericTestUtils.getTestDir();
+    String path = p.getPath().concat(
+        TestOzoneContainer.class.getSimpleName());
+    config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    config.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+    cluster = new MiniOzoneCluster.Builder(config)
+        .numDataNodes(1).setHandlerType("distributed").build();
+    storageContainerLocationClient = cluster
+        .createStorageContainerLocationClient();
+    xceiverClientManager = new XceiverClientManager(config);
+  }
+
+  @AfterClass
+  public static void shutdown() throws InterruptedException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    IOUtils.cleanup(null, storageContainerLocationClient, cluster);
+  }
+
+  /**
+   * createContainerAndGetPipeline creates a set of containers and returns the
+   * Pipelines that define those containers.
+   *
+   * @param count - Number of containers to create.
+   * @return - List of Pipelines.
+   * @throws IOException
+   */
+  private List<Pipeline> createContainerAndGetPipeline(int count)
+      throws IOException {
+    List<Pipeline> containerPipelines = new LinkedList<>();
+    for (int x = 0; x < count; x++) {
+      String traceID = "trace" + RandomStringUtils.randomNumeric(4);
+      String containerName = "container" + RandomStringUtils.randomNumeric(10);
+      Pipeline pipeline =
+          storageContainerLocationClient.allocateContainer(containerName);
+      XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
+      ContainerProtocolCalls.createContainer(client, traceID);
+      // This step is needed since we set private data on pipelines, when we
+      // read the list from CBlockServer. So we mimic that action here.
+      pipeline.setData(Longs.toByteArray(x));
+      containerPipelines.add(pipeline);
+    }
+    return containerPipelines;
+  }
+
+  /**
+   * This test writes some block to the cache and then shuts down the cache.
+   * The cache is then restarted to check that the
+   * correct number of blocks are read from Dirty Log
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testEmptyBlockBufferHandling() throws IOException,
+      InterruptedException, TimeoutException {
+    // Create a new config so that this tests write metafile to new location
+    OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+    URL p = flushTestConfig.getClass().getResource("");
+    String path = p.getPath().concat(
+        TestOzoneContainer.class.getSimpleName() +
+            GenericTestUtils.getMethodName() +
+            RandomStringUtils.randomNumeric(4));
+    flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    String data = RandomStringUtils.random(4 * KB);
+    List<Pipeline> pipelines = createContainerAndGetPipeline(10);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+        xceiverClientManager, metrics);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(pipelines)
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    cache.start();
+    // Write data to the cache
+    cache.put(1, data.getBytes(StandardCharsets.UTF_8));
+    Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
+    Assert.assertEquals(1, metrics.getNumWriteOps());
+    cache.put(2, data.getBytes(StandardCharsets.UTF_8));
+    Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
+    Assert.assertEquals(2, metrics.getNumWriteOps());
+
+    // Store the previous block buffer position
+    Assert.assertEquals(2, metrics.getNumBlockBufferUpdates());
+    // Simulate a shutdown by closing the cache
+    cache.close();
+    Thread.sleep(1000);
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
+    Assert.assertEquals(2 * (Long.SIZE/ Byte.SIZE),
+                                metrics.getNumBytesDirtyLogWritten());
+    Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes());
+    Assert.assertEquals(0, metrics.getNumInterruptedBufferWaits());
+
+    // Restart cache and check that right number of entries are read
+    CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher newFlusher =
+        new ContainerCacheFlusher(flushTestConfig,
+            xceiverClientManager, newMetrics);
+    CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(pipelines)
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(newFlusher)
+        .setCBlockTargetMetrics(newMetrics)
+        .build();
+    newCache.start();
+    Thread fllushListenerThread = new Thread(newFlusher);
+    fllushListenerThread.setDaemon(true);
+    fllushListenerThread.start();
+
+    Thread.sleep(5000);
+    Assert.assertEquals(metrics.getNumBlockBufferUpdates(),
+                                      newMetrics.getNumDirtyLogBlockRead());
+    Assert.assertEquals(newMetrics.getNumDirtyLogBlockRead()
+            * (Long.SIZE/ Byte.SIZE), newMetrics.getNumBytesDirtyLogReads());
+    // Now shutdown again, nothing should be flushed
+    newFlusher.shutdown();
+    Assert.assertEquals(0, newMetrics.getNumBlockBufferUpdates());
+    Assert.assertEquals(0, newMetrics.getNumBytesDirtyLogWritten());
+  }
+
+  @Test
+  public void testPeriodicFlush() throws IOException,
+      InterruptedException, TimeoutException{
+    // Create a new config so that this tests write metafile to new location
+    OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+    URL p = flushTestConfig.getClass().getResource("");
+    String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
+    flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+    flushTestConfig
+        .setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 5);
+
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+        xceiverClientManager, metrics);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(createContainerAndGetPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    cache.start();
+    Thread.sleep(8000);
+    // Ticks will be at 5s, 10s and so on, so this count should be 1
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
+    // Nothing pushed to cache, so nothing should be written
+    Assert.assertEquals(0, metrics.getNumBytesDirtyLogWritten());
+    Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted());
+    cache.close();
+    // After close, another trigger should happen but still no data written
+    Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered());
+    Assert.assertEquals(0, metrics.getNumBytesDirtyLogWritten());
+    Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted());
+    Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes());
+  }
+
+  @Test
+  public void testSingleBufferFlush() throws IOException,
+      InterruptedException, TimeoutException {
+    // Create a new config so that this tests write metafile to new location
+    OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+    URL p = flushTestConfig.getClass().getResource("");
+    String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
+    flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    String data = RandomStringUtils.random(4 * KB);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+        xceiverClientManager, metrics);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(createContainerAndGetPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    cache.start();
+
+    for (int i = 0; i < 511; i++) {
+      cache.put(i, data.getBytes(StandardCharsets.UTF_8));
+    }
+    // After writing 511 block no flush should happen
+    Assert.assertEquals(0, metrics.getNumBlockBufferFlushTriggered());
+    Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted());
+
+
+    // After one more block it should
+    cache.put(512, data.getBytes(StandardCharsets.UTF_8));
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
+    Thread.sleep(1000);
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
+    cache.close();
+    Assert.assertEquals(512 * (Long.SIZE / Byte.SIZE),
+                                metrics.getNumBytesDirtyLogWritten());
+  }
+
+  @Test
+  public void testMultipleBuffersFlush() throws IOException,
+      InterruptedException, TimeoutException {
+    // Create a new config so that this tests write metafile to new location
+    OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+    URL p = flushTestConfig.getClass().getResource("");
+    String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
+    flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    String data = RandomStringUtils.random(4 * KB);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+        xceiverClientManager, metrics);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(createContainerAndGetPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    cache.start();
+
+    for (int i = 0; i < 4; i++) {
+      for (int j = 0; j < 512; j++) {
+        cache.put(i * 512 + j, data.getBytes(StandardCharsets.UTF_8));
+      }
+      // Flush should be triggered after every 512 block write
+      Assert.assertEquals(i + 1, metrics.getNumBlockBufferFlushTriggered());
+    }
+    Assert.assertEquals(0, metrics.getNumIllegalDirtyLogFiles());
+    Assert.assertEquals(0, metrics.getNumFailedDirtyLogFileDeletes());
+    cache.close();
+    Assert.assertEquals(4 * 512 * (Long.SIZE / Byte.SIZE),
+        metrics.getNumBytesDirtyLogWritten());
+    Assert.assertEquals(5, metrics.getNumBlockBufferFlushTriggered());
+    Assert.assertEquals(4, metrics.getNumBlockBufferFlushCompleted());
+  }
+
+  @Test
+  public void testSingleBlockFlush() throws IOException,
+      InterruptedException, TimeoutException{
+    // Create a new config so that this tests write metafile to new location
+    OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+    URL p = flushTestConfig.getClass().getResource("");
+    String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
+    flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+    flushTestConfig
+        .setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 5);
+
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    String data = RandomStringUtils.random(4 * KB);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+        xceiverClientManager, metrics);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(createContainerAndGetPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    cache.start();
+    cache.put(0, data.getBytes(StandardCharsets.UTF_8));
+    Thread.sleep(8000);
+    // Ticks will be at 5s, 10s and so on, so this count should be 1
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
+    // 1 block written to cache, which should be flushed
+    Assert.assertEquals(8, metrics.getNumBytesDirtyLogWritten());
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
+    cache.close();
+    // After close, another trigger should happen but no data should be written
+    Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered());
+    Assert.assertEquals(8, metrics.getNumBytesDirtyLogWritten());
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
+    Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e40e0954/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
index f15a5ed..2dd72b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
@@ -62,10 +62,6 @@ import static org.apache.hadoop.cblock.CBlockConfigKeys.
     DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
 import static org.apache.hadoop.cblock.CBlockConfigKeys.
     DFS_CBLOCK_TRACE_IO;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
 
 /**
  * Tests for Tests for local cache.
@@ -238,15 +234,12 @@ public class TestLocalBlockCache {
       cache.put(blockid, data.getBytes(StandardCharsets.UTF_8));
     }
     Assert.assertEquals(totalBlocks, metrics.getNumWriteOps());
+    Assert.assertEquals(totalBlocks,  metrics.getNumBlockBufferUpdates());
     LOG.info("Wrote 50K blocks, waiting for replication to finish.");
     GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
     long endTime = Time.monotonicNow();
     LOG.info("Time taken for writing {} blocks is {} seconds", totalBlocks,
         TimeUnit.MILLISECONDS.toSeconds(endTime - startTime));
-    long blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
-        DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT);
-    Assert.assertEquals(metrics.getNumWriteOps() / blockBufferSize,
-        metrics.getNumBlockBufferFlush());
     // TODO: Read this data back.
     cache.close();
   }
@@ -278,6 +271,7 @@ public class TestLocalBlockCache {
     Assert.assertEquals(1, metrics.getNumReadOps());
     Assert.assertEquals(1, metrics.getNumReadLostBlocks());
     Assert.assertEquals(1, metrics.getNumReadCacheMiss());
+    cache.close();
   }
 
   @Test
@@ -508,95 +502,6 @@ public class TestLocalBlockCache {
   }
 
   /**
-   * This test writes some block to the cache and then shuts down the cache.
-   * The cache is then restarted to check that the
-   * correct number of blocks are read from Dirty Log
-   *
-   * @throws IOException
-   */
-  @Test
-  public void testEmptyBlockBufferHandling() throws IOException,
-      InterruptedException, TimeoutException {
-    // Create a new config so that this tests write metafile to new location
-    OzoneConfiguration flushTestConfig = new OzoneConfiguration();
-    URL p = flushTestConfig.getClass().getResource("");
-    String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
-    flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
-    flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
-    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
-
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
-    String userName = "user" + RandomStringUtils.randomNumeric(4);
-    String data = RandomStringUtils.random(4 * KB);
-    List<Pipeline> pipelines = getContainerPipeline(10);
-
-    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
-    ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
-        xceiverClientManager, metrics);
-    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
-        .setConfiguration(flushTestConfig)
-        .setVolumeName(volumeName)
-        .setUserName(userName)
-        .setPipelines(pipelines)
-        .setClientManager(xceiverClientManager)
-        .setBlockSize(4 * KB)
-        .setVolumeSize(50 * GB)
-        .setFlusher(flusher)
-        .setCBlockTargetMetrics(metrics)
-        .build();
-    cache.start();
-    // Write data to the cache
-    cache.put(1, data.getBytes(StandardCharsets.UTF_8));
-    Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
-    Assert.assertEquals(1, metrics.getNumWriteOps());
-    cache.put(2, data.getBytes(StandardCharsets.UTF_8));
-    Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
-    Assert.assertEquals(2, metrics.getNumWriteOps());
-
-    // Store the previous block buffer position
-    Assert.assertEquals(2, metrics.getNumDirtyLogBlockUpdated());
-    // Simulate a shutdown by closing the cache
-    GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
-    cache.close();
-    Assert.assertEquals(2 * (Long.SIZE/ Byte.SIZE),
-                                metrics.getNumBytesDirtyLogWritten());
-    Assert.assertEquals(0, metrics.getNumFailedDirtyBlockFlushes());
-
-    // Restart cache and check that right number of entries are read
-    CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
-    ContainerCacheFlusher newFlusher =
-        new ContainerCacheFlusher(flushTestConfig,
-            xceiverClientManager, newMetrics);
-    CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
-        .setConfiguration(flushTestConfig)
-        .setVolumeName(volumeName)
-        .setUserName(userName)
-        .setPipelines(pipelines)
-        .setClientManager(xceiverClientManager)
-        .setBlockSize(4 * KB)
-        .setVolumeSize(50 * GB)
-        .setFlusher(newFlusher)
-        .setCBlockTargetMetrics(newMetrics)
-        .build();
-    newCache.start();
-    Thread flushListenerThread = new Thread(newFlusher);
-    flushListenerThread.setDaemon(true);
-    flushListenerThread.start();
-
-    Thread.sleep(5000);
-    Assert.assertEquals(metrics.getNumDirtyLogBlockUpdated(),
-                                      newMetrics.getNumDirtyLogBlockRead());
-    Assert.assertEquals(newMetrics.getNumDirtyLogBlockRead()
-            * (Long.SIZE/ Byte.SIZE), newMetrics.getNumBytesDirtyLogReads());
-    // Now shutdown again, nothing should be flushed
-    newCache.close();
-    newFlusher.shutdown();
-    Assert.assertEquals(0, newMetrics.getNumDirtyLogBlockUpdated());
-    Assert.assertEquals(0, newMetrics.getNumBytesDirtyLogWritten());
-    Assert.assertEquals(0, newMetrics.getNumFailedDirtyBlockFlushes());
-  }
-
-  /**
    * This test writes some block to the cache and then shuts down the cache
    * The cache is then restarted with "short.circuit.io" disable to check
    * that the blocks are read correctly from the container.
@@ -642,9 +547,9 @@ public class TestLocalBlockCache {
         .setCBlockTargetMetrics(metrics)
         .build();
     cache.start();
-    Thread fllushListenerThread = new Thread(flusher);
-    fllushListenerThread.setDaemon(true);
-    fllushListenerThread.start();
+    Thread flushListenerThread = new Thread(flusher);
+    flushListenerThread.setDaemon(true);
+    flushListenerThread.start();
     Assert.assertTrue(cache.isShortCircuitIOEnabled());
     // Write data to the cache
     for (int i = 0; i < 512; i++) {
@@ -686,7 +591,7 @@ public class TestLocalBlockCache {
     }
     Assert.assertEquals(0, newMetrics.getNumReadLostBlocks());
     Assert.assertEquals(0, newMetrics.getNumFailedReadBlocks());
-    newFlusher.shutdown();
     newCache.close();
+    newFlusher.shutdown();
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org