You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:50:07 UTC

[21/25] incubator-geode git commit: GEODE-10: Reinstating HDFS persistence code

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java
new file mode 100644
index 0000000..cdf7452
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.FunctionService;
+import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
+import com.gemstone.gemfire.cache.hdfs.internal.FlushObserver.AsyncFlushResult;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.internal.InternalEntity;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
+import com.gemstone.gemfire.internal.cache.execute.LocalResultCollector;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+public class HDFSFlushQueueFunction implements Function, InternalEntity{
+  private static final int MAX_RETRIES = Integer.getInteger("gemfireXD.maxFlushQueueRetries", 3);
+  private static final boolean VERBOSE = Boolean.getBoolean("hdfsFlushQueueFunction.VERBOSE");
+  private static final Logger logger = LogService.getLogger();
+  private static final String ID = HDFSFlushQueueFunction.class.getName();
+  
+  public static void flushQueue(PartitionedRegion pr, int maxWaitTime) {
+    
+    Set<Integer> buckets = new HashSet<Integer>(pr.getRegionAdvisor().getBucketSet());
+
+    maxWaitTime *= 1000;
+    long start = System.currentTimeMillis();
+    
+    int retries = 0;
+    long remaining = 0;
+    while (retries++ < MAX_RETRIES && (remaining = waitTime(start, maxWaitTime)) > 0) {
+      if (logger.isDebugEnabled() || VERBOSE) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flushing buckets " + buckets 
+            + ", attempt = " + retries 
+            + ", remaining = " + remaining));
+      }
+      
+      HDFSFlushQueueArgs args = new HDFSFlushQueueArgs(buckets, remaining);
+      
+      HDFSFlushQueueResultCollector rc = new HDFSFlushQueueResultCollector(buckets);
+      AbstractExecution exec = (AbstractExecution) FunctionService
+          .onRegion(pr)
+          .withArgs(args)
+          .withCollector(rc);
+      exec.setWaitOnExceptionFlag(true);
+      
+      try {
+        exec.execute(ID);
+        if (rc.getResult()) {
+          if (logger.isDebugEnabled() || VERBOSE) {
+            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flushed all buckets successfully")); 
+          }
+          return;
+        }
+      } catch (FunctionException e) {
+        if (logger.isDebugEnabled() || VERBOSE) {
+          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Encountered error flushing queue"), e); 
+        }
+      }
+      
+      buckets.removeAll(rc.getSuccessfulBuckets());
+      for (int bucketId : buckets) {
+        remaining = waitTime(start, maxWaitTime);
+        if (logger.isDebugEnabled() || VERBOSE) {
+          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Waiting for bucket " + bucketId)); 
+        }
+        pr.getNodeForBucketWrite(bucketId, new PartitionedRegion.RetryTimeKeeper((int) remaining));
+      }
+    }
+    
+    pr.checkReadiness();
+    throw new FunctionException("Unable to flush the following buckets: " + buckets);
+  }
+  
+  private static long waitTime(long start, long max) {
+    if (max == 0) {
+      return Integer.MAX_VALUE;
+    }
+    return start + max - System.currentTimeMillis();
+  }
+  
+  @Override
+  public void execute(FunctionContext context) {
+    RegionFunctionContext rfc = (RegionFunctionContext) context;
+    PartitionedRegion pr = (PartitionedRegion) rfc.getDataSet();
+    
+    HDFSFlushQueueArgs args = (HDFSFlushQueueArgs) rfc.getArguments();
+    Set<Integer> buckets = new HashSet<Integer>(args.getBuckets());
+    buckets.retainAll(pr.getDataStore().getAllLocalPrimaryBucketIds());
+
+    Map<Integer, AsyncFlushResult> flushes = new HashMap<Integer, AsyncFlushResult>();
+    for (int bucketId : buckets) {
+      try {
+        HDFSBucketRegionQueue brq = getQueue(pr, bucketId);
+        if (brq != null) {
+          if (logger.isDebugEnabled() || VERBOSE) {
+            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flushing bucket " + bucketId)); 
+          }
+          flushes.put(bucketId, brq.flush());
+        }
+      } catch (ForceReattemptException e) {
+        if (logger.isDebugEnabled() || VERBOSE) {
+          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Encountered error flushing bucket " + bucketId), e); 
+        }
+      }
+    }
+    
+    try {
+      long start = System.currentTimeMillis();
+      for (Map.Entry<Integer, AsyncFlushResult> flush : flushes.entrySet()) {
+        long remaining = waitTime(start, args.getMaxWaitTime());
+        if (logger.isDebugEnabled() || VERBOSE) {
+          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Waiting for bucket " + flush.getKey() 
+              + " to complete flushing, remaining = " + remaining)); 
+        }
+        
+        if (flush.getValue().waitForFlush(remaining, TimeUnit.MILLISECONDS)) {
+          if (logger.isDebugEnabled() || VERBOSE) {
+            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Bucket " + flush.getKey() + " flushed successfully")); 
+          }
+          rfc.getResultSender().sendResult(new FlushStatus(flush.getKey()));
+        }
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+    
+    if (logger.isDebugEnabled() || VERBOSE) {
+      logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Sending final flush result")); 
+    }
+    rfc.getResultSender().lastResult(FlushStatus.last());
+  }
+
+  private HDFSBucketRegionQueue getQueue(PartitionedRegion pr, int bucketId) 
+      throws ForceReattemptException {
+    AsyncEventQueueImpl aeq = pr.getHDFSEventQueue();
+    AbstractGatewaySender gw = (AbstractGatewaySender) aeq.getSender();
+    AbstractGatewaySenderEventProcessor ep = gw.getEventProcessor();
+    if (ep == null) {
+      return null;
+    }
+    
+    ConcurrentParallelGatewaySenderQueue queue = (ConcurrentParallelGatewaySenderQueue) ep.getQueue();
+    return queue.getBucketRegionQueue(pr, bucketId);
+  }
+  
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public boolean optimizeForWrite() {
+    return true;
+  }
+
+  @Override
+  public boolean isHA() {
+    return false;
+  }
+  
+  public static class HDFSFlushQueueResultCollector implements LocalResultCollector<Object, Boolean> {
+    private final CountDownLatch complete;
+    private final Set<Integer> expectedBuckets;
+    private final Set<Integer> successfulBuckets;
+
+    private volatile ReplyProcessor21 processor;
+    
+    public HDFSFlushQueueResultCollector(Set<Integer> expectedBuckets) {
+      this.expectedBuckets = expectedBuckets;
+      
+      complete = new CountDownLatch(1);
+      successfulBuckets = new HashSet<Integer>();
+    }
+    
+    public Set<Integer> getSuccessfulBuckets() {
+      synchronized (successfulBuckets) {
+        return new HashSet<Integer>(successfulBuckets);
+      }
+    }
+    
+    @Override
+    public Boolean getResult() throws FunctionException {
+      try {
+        complete.await();
+        synchronized (successfulBuckets) {
+          LogWriterI18n logger = InternalDistributedSystem.getLoggerI18n();
+          if (logger.fineEnabled() || VERBOSE) {
+            logger.info(LocalizedStrings.DEBUG, "Expected buckets: " + expectedBuckets);
+            logger.info(LocalizedStrings.DEBUG, "Successful buckets: " + successfulBuckets);
+          }
+          return expectedBuckets.equals(successfulBuckets);
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        GemFireCacheImpl.getExisting().getCancelCriterion().checkCancelInProgress(e);
+        throw new FunctionException(e);
+      }
+    }
+
+    @Override
+    public Boolean getResult(long timeout, TimeUnit unit)
+        throws FunctionException, InterruptedException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public synchronized void addResult(DistributedMember memberID, Object result) {
+      if (result instanceof FlushStatus) {
+        FlushStatus status = (FlushStatus) result;
+        if (!status.isLast()) {
+          synchronized (successfulBuckets) {
+            successfulBuckets.add(status.getBucketId());
+          }        
+        }
+      }
+    }
+
+    @Override
+    public void endResults() {    	
+      complete.countDown();
+    }
+
+    @Override
+    public void clearResults() {
+    }
+
+    @Override
+    public void setProcessor(ReplyProcessor21 processor) {
+      this.processor = processor;
+    }
+
+    @Override
+    public ReplyProcessor21 getProcessor() {
+      return processor;
+    }
+
+	@Override
+	public void setException(Throwable exception) {
+		// TODO Auto-generated method stub
+		
+	}
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java
new file mode 100644
index 0000000..ec0f9ff
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.internal.VersionedDataSerializable;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * Arguments passed to the HDFSForceCompactionFunction
+ * 
+ */
+@SuppressWarnings("serial")
+public class HDFSForceCompactionArgs implements VersionedDataSerializable {
+
+  private static Version[] serializationVersions = new Version[]{ Version.GFE_81 };
+
+  private HashSet<Integer> buckets;
+
+  private boolean isMajor;
+
+  private int maxWaitTime;
+
+  public HDFSForceCompactionArgs() {
+  }
+
+  public HDFSForceCompactionArgs(Set<Integer> buckets, boolean isMajor, Integer maxWaitTime) {
+    this.buckets = new HashSet<Integer>(buckets);
+    this.isMajor = isMajor;
+    this.maxWaitTime = maxWaitTime;
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    DataSerializer.writeHashSet(buckets, out);
+    out.writeBoolean(isMajor);
+    out.writeInt(maxWaitTime);
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException,
+      ClassNotFoundException {
+    this.buckets = DataSerializer.readHashSet(in);
+    this.isMajor = in.readBoolean();
+    this.maxWaitTime = in.readInt();
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return serializationVersions;
+  }
+
+  public Set<Integer> getBuckets() {
+    return (Set<Integer>) buckets;
+  }
+
+  public void setBuckets(Set<Integer> buckets) {
+    this.buckets = new HashSet<Integer>(buckets);
+  }
+
+  public boolean isMajor() {
+    return isMajor;
+  }
+
+  public void setMajor(boolean isMajor) {
+    this.isMajor = isMajor;
+  }
+
+  public boolean isSynchronous() {
+    return maxWaitTime == 0;
+  }
+
+  public int getMaxWaitTime() {
+    return this.maxWaitTime;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(getClass().getCanonicalName()).append("@")
+    .append(System.identityHashCode(this))
+    .append(" buckets:").append(buckets)
+    .append(" isMajor:").append(isMajor)
+    .append(" maxWaitTime:").append(maxWaitTime);
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java
new file mode 100644
index 0000000..d26ac1b
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
+import com.gemstone.gemfire.internal.InternalEntity;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Function responsible for forcing a compaction on all members
+ * of the system
+ *
+ */
+@SuppressWarnings("serial")
+public class HDFSForceCompactionFunction implements Function, InternalEntity {
+
+  public static final int FORCE_COMPACTION_MAX_RETRIES = Integer.getInteger("gemfireXD.maxCompactionRetries", 3);
+
+  public static final int BUCKET_ID_FOR_LAST_RESULT = -1;
+
+  public static final String ID = "HDFSForceCompactionFunction";
+
+  private static final Logger logger = LogService.getLogger();
+  
+  @Override
+  public void execute(FunctionContext context) {
+    if (context.isPossibleDuplicate()) {
+      // do not re-execute the function, another function
+      // targeting the failed buckets will be invoked
+      context.getResultSender().lastResult(new CompactionStatus(BUCKET_ID_FOR_LAST_RESULT, false));
+      return;
+    }
+    RegionFunctionContext rfc = (RegionFunctionContext) context;
+    PartitionedRegion pr = (PartitionedRegion) rfc.getDataSet();
+    HDFSForceCompactionArgs args = (HDFSForceCompactionArgs) rfc.getArguments();
+    Set<Integer> buckets = new HashSet<Integer>(args.getBuckets()); // copying avoids race when the function coordinator
+                                                                    // also runs the function locally
+    buckets.retainAll(pr.getDataStore().getAllLocalPrimaryBucketIds());
+
+    List<Future<CompactionStatus>> futures =  pr.forceLocalHDFSCompaction(buckets, args.isMajor(), 0);
+    int waitFor = args.getMaxWaitTime();
+    for (Future<CompactionStatus> future : futures) {
+      long start = System.currentTimeMillis();
+      CompactionStatus status = null;
+      try {
+        // TODO use a CompletionService instead
+        if (!args.isSynchronous() && waitFor <= 0) {
+          break;
+        }
+        status = args.isSynchronous() ? future.get() : future.get(waitFor, TimeUnit.MILLISECONDS);
+        buckets.remove(status.getBucketId());
+        if (logger.isDebugEnabled()) {
+          logger.debug("HDFS: ForceCompaction sending result:"+status);
+        }
+        context.getResultSender().sendResult(status);
+        long elapsedTime = System.currentTimeMillis() - start;
+        waitFor -= elapsedTime;
+      } catch (InterruptedException e) {
+        // send a list of failed buckets after waiting for all buckets
+      } catch (ExecutionException e) {
+        // send a list of failed buckets after waiting for all buckets
+      } catch (TimeoutException e) {
+        // do not wait for other buckets to complete
+        break;
+      }
+    }
+    // for asynchronous invocation, the status is true for buckets that we did not wait for
+    boolean status = args.isSynchronous() ? false : true;
+    for (Integer bucketId : buckets) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("HDFS: ForceCompaction sending result for bucket:"+bucketId);
+      }
+      context.getResultSender().sendResult(new CompactionStatus(bucketId, status));
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("HDFS: ForceCompaction sending last result");
+    }
+    context.getResultSender().lastResult(new CompactionStatus(BUCKET_ID_FOR_LAST_RESULT, true));
+  }
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public boolean optimizeForWrite() {
+    // run compaction on primary members
+    return true;
+  }
+
+  @Override
+  public boolean isHA() {
+    // so that we can target re-execution on failed buckets
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java
new file mode 100644
index 0000000..ee5e4aa
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.execute.LocalResultCollector;
+
+/**
+ * 
+ */
+public class HDFSForceCompactionResultCollector implements LocalResultCollector<Object, List<CompactionStatus>> {
+
+  /** list of received replies*/
+  private List<CompactionStatus> reply = new ArrayList<CompactionStatus>();
+
+  /** semaphore to block the caller of getResult()*/
+  private CountDownLatch waitForResults = new CountDownLatch(1);
+
+  /** boolean to indicate if clearResults() was called to indicate a failure*/
+  private volatile boolean shouldRetry;
+
+  private ReplyProcessor21 processor;
+
+  @Override
+  public List<CompactionStatus> getResult() throws FunctionException {
+    try {
+      waitForResults.await();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      GemFireCacheImpl.getExisting().getCancelCriterion().checkCancelInProgress(e);
+      throw new FunctionException(e);
+    }
+    return reply;
+  }
+
+  @Override
+  public List<CompactionStatus> getResult(long timeout, TimeUnit unit)
+      throws FunctionException, InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void addResult(DistributedMember memberID,
+      Object resultOfSingleExecution) {
+    if (resultOfSingleExecution instanceof CompactionStatus) {
+      CompactionStatus status = (CompactionStatus) resultOfSingleExecution;
+      if (status.getBucketId() != HDFSForceCompactionFunction.BUCKET_ID_FOR_LAST_RESULT) {
+        reply.add(status);
+      }
+    }
+  }
+
+  @Override
+  public void endResults() {
+    waitForResults.countDown();
+  }
+
+  @Override
+  public void clearResults() {
+    this.shouldRetry = true;
+    waitForResults.countDown();
+  }
+
+  /**
+   * @return true if retry should be attempted
+   */
+  public boolean shouldRetry() {
+    return this.shouldRetry || !getFailedBucketIds().isEmpty();
+  }
+
+  private Set<Integer> getFailedBucketIds() {
+    Set<Integer> result = new HashSet<Integer>();
+    for (CompactionStatus status : reply) {
+      if (!status.isStatus()) {
+        result.add(status.getBucketId());
+      }
+    }
+    return result;
+  }
+
+  public Set<Integer> getSuccessfulBucketIds() {
+    Set<Integer> result = new HashSet<Integer>();
+    for (CompactionStatus status : reply) {
+      if (status.isStatus()) {
+        result.add(status.getBucketId());
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public void setProcessor(ReplyProcessor21 processor) {
+    this.processor = processor;
+  }
+
+  @Override
+  public ReplyProcessor21 getProcessor() {
+    return this.processor;
+  }
+
+@Override
+public void setException(Throwable exception) {
+	// TODO Auto-generated method stub
+	
+}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java
new file mode 100644
index 0000000..789fe4d
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import com.gemstone.gemfire.cache.execute.FunctionAdapter;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
+import com.gemstone.gemfire.internal.InternalEntity;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+
+/**
+ * Function that returns the oldest timestamp among all the major
+ * compacted buckets on the members
+ *
+ */
+@SuppressWarnings("serial")
+public class HDFSLastCompactionTimeFunction extends FunctionAdapter implements InternalEntity{
+
+  public static final String ID = "HDFSLastCompactionTimeFunction";
+
+  @Override
+  public void execute(FunctionContext context) {
+    RegionFunctionContext rfc = (RegionFunctionContext) context;
+    PartitionedRegion pr = (PartitionedRegion) rfc.getDataSet();
+    rfc.getResultSender().lastResult(pr.lastLocalMajorHDFSCompaction());
+  }
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean isHA() {
+    return true;
+  }
+
+  @Override
+  public boolean optimizeForWrite() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java
new file mode 100644
index 0000000..6d70dce
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.gemstone.gemfire.StatisticsFactory;
+import com.gemstone.gemfire.cache.GemFireCache;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.internal.SystemTimer;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Cache for hoplog organizers associated with buckets of a region. The director creates an
+ * instance of organizer on first get request. It does not read HDFS in advance. Creation of
+ * organizer depends on File system initialization that takes outside this class. This class also
+ * provides utility methods to monitor usage and manage bucket sets.
+ * 
+ */
+public class HDFSRegionDirector {
+  /*
+   * Maps each region name to its listener and store objects. This map must be populated before file
+   * organizers of a bucket can be created
+   */
+  private final ConcurrentHashMap<String, HdfsRegionManager> regionManagerMap;
+  
+  /**
+   * regions of this Gemfire cache are managed by this director. TODO this
+   * should be final and be provided at the time of creation of this instance or
+   * through a cache directory
+   */
+  private GemFireCache cache;
+  
+  // singleton instance
+  private static HDFSRegionDirector instance;
+  
+  final ScheduledExecutorService janitor;
+  private JanitorTask janitorTask;
+  
+  private static final Logger logger = LogService.getLogger();
+  protected final static String logPrefix = "<" + "RegionDirector" + "> ";
+  
+  
+  private HDFSRegionDirector() {
+    regionManagerMap = new ConcurrentHashMap<String, HDFSRegionDirector.HdfsRegionManager>();
+    janitor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread thread = new Thread(r, "HDFSRegionJanitor");
+        thread.setDaemon(true);
+        return thread;
+      }
+    });
+    
+    long interval = Long.getLong(HoplogConfig.JANITOR_INTERVAL_SECS,
+        HoplogConfig.JANITOR_INTERVAL_SECS_DEFAULT);
+    
+    janitorTask = new JanitorTask();
+    janitor.scheduleWithFixedDelay(janitorTask, interval, interval,
+        TimeUnit.SECONDS);
+  }
+  
+  public synchronized static HDFSRegionDirector getInstance() {
+    if (instance == null) {
+      instance = new HDFSRegionDirector();
+    }
+    return instance;
+  }
+  
+  public HDFSRegionDirector setCache(GemFireCache cache) {
+    this.cache = cache;
+    return this;
+  }
+
+  public GemFireCache getCache() {
+    return this.cache;
+  }
+  /**
+   * Caches listener, store object and list of organizers associated with the region associated with
+   * a region. Subsequently, these objects will be used each time an organizer is created
+   */
+  public synchronized HdfsRegionManager manageRegion(LocalRegion region, String storeName,
+      HoplogListener listener) {
+    
+    HdfsRegionManager manager = regionManagerMap.get(region.getFullPath());
+    if (manager != null) {
+      // this is an attempt to re-register a region. Assuming this was required
+      // to modify listener or hdfs store impl associated with the region. Hence
+      // will clear the region first.
+
+      clear(region.getFullPath());
+    }
+    
+    HDFSStoreImpl store = HDFSStoreDirector.getInstance().getHDFSStore(storeName);
+    manager = new HdfsRegionManager(region, store, listener, getStatsFactory(), this);
+    regionManagerMap.put(region.getFullPath(), manager);
+    
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}Now managing region " + region.getFullPath(), logPrefix);
+    }
+    
+    return manager;
+  }
+  
+  /**
+   * Find the regions that are part of a particular HDFS store.
+   */
+  public Collection<String> getRegionsInStore(HDFSStore store) {
+    TreeSet<String> regions = new TreeSet<String>();
+    for(Map.Entry<String, HdfsRegionManager> entry : regionManagerMap.entrySet()) {
+      if(entry.getValue().getStore().equals(store)) {
+        regions.add(entry.getKey());
+      }
+    }
+    return regions;
+  }
+  
+  public int getBucketCount(String regionPath) {
+    HdfsRegionManager manager = regionManagerMap.get(regionPath);
+    if (manager == null) {
+      throw new IllegalStateException("Region not initialized");
+    }
+
+    return manager.bucketOrganizerMap.size();
+  }
+  
+  public void closeWritersForRegion(String regionPath, int minSizeForFileRollover) throws IOException {
+    regionManagerMap.get(regionPath).closeWriters(minSizeForFileRollover);
+  }
+  /**
+   * removes and closes all {@link HoplogOrganizer} of this region. This call is expected with
+   * a PR disowns a region.
+   */
+  public synchronized void clear(String regionPath) {
+    HdfsRegionManager manager = regionManagerMap.remove(regionPath);
+    if (manager != null) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}Closing hoplog region manager for " + regionPath, logPrefix);
+      }
+      manager.close();
+    }
+  }
+
+  /**
+   * Closes all region managers, organizers and hoplogs. This method should be
+   * called before closing the cache to gracefully release all resources
+   */
+  public static synchronized void reset() {
+    if (instance == null) {
+      // nothing to reset
+      return;
+    }
+    
+    instance.janitor.shutdownNow();
+    
+    for (String region : instance.regionManagerMap.keySet()) {
+      instance.clear(region);
+    }
+    instance.cache = null;
+    instance = null;
+  }
+  
+  /**
+   * Terminates current janitor task and schedules a new. The rate of the new
+   * task is based on the value of system property at that time
+   */
+  public static synchronized void resetJanitor() {
+    instance.janitorTask.terminate();
+    instance.janitorTask = instance.new JanitorTask();
+    long interval = Long.getLong(HoplogConfig.JANITOR_INTERVAL_SECS,
+        HoplogConfig.JANITOR_INTERVAL_SECS_DEFAULT);
+    instance.janitor.scheduleWithFixedDelay(instance.janitorTask, 0, interval,
+        TimeUnit.SECONDS);
+  }
+  
+  /**
+   * @param regionPath name of region for which stats object is desired
+   * @return {@link SortedOplogStatistics} instance associated with hdfs region
+   *         name. Null if region is not managed by director
+   */
+  public synchronized SortedOplogStatistics getHdfsRegionStats(String regionPath) {
+    HdfsRegionManager manager = regionManagerMap.get(regionPath);
+    return manager == null ? null : manager.getHdfsStats();
+  }
+  
+  private StatisticsFactory getStatsFactory() {
+    return cache.getDistributedSystem();
+  }
+
+  /**
+   * A helper class to manage region and its organizers
+   */
+  public static class HdfsRegionManager {
+    // name and store configuration of the region whose buckets are managed by this director.
+    private LocalRegion region;
+    private HDFSStoreImpl store;
+    private HoplogListener listener;
+    private volatile boolean closed = false;
+    private final int FILE_ROLLOVER_TASK_INTERVAL = Integer.parseInt
+        (System.getProperty("gemfire.HDFSRegionDirector.FILE_ROLLOVER_TASK_INTERVAL_SECONDS", "60"));
+    
+    private SystemTimer hoplogCloseTimer = null;
+    
+    // instance of hdfs statistics object for this hdfs based region. This
+    // object will collect usage and performance related statistics.
+    private final SortedOplogStatistics hdfsStats;
+
+    /*
+     * An instance of organizer is created for each bucket of regionName region residing on this
+     * node. This member maps bucket id with its corresponding organizer instance. A lock is used to
+     * manage concurrent writes to the map.
+     */
+    private ConcurrentMap<Integer, HoplogOrganizer> bucketOrganizerMap;
+    
+    private HDFSRegionDirector hdfsRegionDirector;
+
+    /**
+     * @param listener
+     *          listener of change events like file creation and deletion
+     * @param hdfsRegionDirector 
+     */
+    HdfsRegionManager(LocalRegion region, HDFSStoreImpl store,
+        HoplogListener listener, StatisticsFactory statsFactory, HDFSRegionDirector hdfsRegionDirector) {
+      bucketOrganizerMap = new ConcurrentHashMap<Integer, HoplogOrganizer>();
+      this.region = region;
+      this.listener = listener;
+      this.store = store;
+      this.hdfsStats = new SortedOplogStatistics(statsFactory, "HDFSRegionStatistics", region.getFullPath());
+      this.hdfsRegionDirector = hdfsRegionDirector;
+    }
+
+    public void closeWriters(int minSizeForFileRollover) throws IOException {
+      final long startTime = System.currentTimeMillis();
+      long elapsedTime = 0;
+        
+      Collection<HoplogOrganizer> organizers = bucketOrganizerMap.values();
+      
+      for (HoplogOrganizer organizer : organizers) {
+      
+        try {
+          this.getRegion().checkReadiness();
+        } catch (Exception e) {
+          break;
+        }
+        
+        ((HDFSUnsortedHoplogOrganizer)organizer).synchronizedCloseWriter(true, 0, 
+            minSizeForFileRollover);
+      }
+      
+    }
+
+    public synchronized <T extends PersistedEventImpl> HoplogOrganizer<T> create(int bucketId) throws IOException {
+      assert !bucketOrganizerMap.containsKey(bucketId);
+
+      HoplogOrganizer<?> organizer = region.getHDFSWriteOnly() 
+          ? new HDFSUnsortedHoplogOrganizer(this, bucketId) 
+          : new HdfsSortedOplogOrganizer(this, bucketId);
+
+      bucketOrganizerMap.put(bucketId, organizer);
+      // initialize a timer that periodically closes the hoplog writer if the 
+      // time for rollover has passed. It also has the responsibility to fix the files.  
+      if (this.region.getHDFSWriteOnly() && 
+          hoplogCloseTimer == null) {
+        hoplogCloseTimer = new SystemTimer(hdfsRegionDirector.
+            getCache().getDistributedSystem(), true);
+        
+        // schedule the task to fix the files that were not closed properly 
+        // last time. 
+        hoplogCloseTimer.scheduleAtFixedRate(new CloseTmpHoplogsTimerTask(this), 
+            1000, FILE_ROLLOVER_TASK_INTERVAL * 1000);
+        
+        if (logger.isDebugEnabled()) {
+          logger.debug("{}Schedulng hoplog rollover timer with interval "+ FILE_ROLLOVER_TASK_INTERVAL + 
+              " for hoplog organizer for " + region.getFullPath()
+              + ":" + bucketId + " " + organizer, logPrefix);
+        }
+      }
+      
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}Constructed hoplog organizer for " + region.getFullPath()
+            + ":" + bucketId + " " + organizer, logPrefix);
+      }
+      return (HoplogOrganizer<T>) organizer;
+    }
+    
+    public synchronized <T extends PersistedEventImpl> void addOrganizer(
+        int bucketId, HoplogOrganizer<T> organizer) {
+      if (bucketOrganizerMap.containsKey(bucketId)) {
+        throw new IllegalArgumentException();
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}added pre constructed organizer " + region.getFullPath()
+            + ":" + bucketId + " " + organizer, logPrefix);
+      }
+      bucketOrganizerMap.put(bucketId, organizer);
+    }
+
+    public void close() {
+      closed = true;
+      
+      if (this.region.getHDFSWriteOnly() && 
+          hoplogCloseTimer != null) {
+        hoplogCloseTimer.cancel();
+        hoplogCloseTimer = null;
+      }
+      for (int bucket : bucketOrganizerMap.keySet()) {
+        close(bucket);
+      }
+    }
+    
+    public boolean isClosed() {
+      return closed;
+    }
+
+    public synchronized void close(int bucketId) {
+      try {
+        HoplogOrganizer organizer = bucketOrganizerMap.remove(bucketId);
+        if (organizer != null) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("{}Closing hoplog organizer for " + region.getFullPath() + ":" + 
+                bucketId + " " + organizer, logPrefix);
+          }
+          organizer.close();
+        }
+      } catch (IOException e) {
+        if (logger.isDebugEnabled()) {
+          logger.debug(logPrefix + "Error closing hoplog organizer for " + region.getFullPath() + ":" + bucketId, e);
+        }
+      }
+      //TODO abort compaction and flush requests for this region
+    }
+    
+    public static String getRegionFolder(String regionPath) {
+      String folder = regionPath;
+      //Change any underscore into a double underscore
+      folder = folder.replace("_", "__");
+      //get rid of the leading slash
+      folder = folder.replaceFirst("^/", "");
+      //replace slashes with underscores
+      folder = folder.replace('/', '_');
+      return folder;
+    }
+
+    public String getRegionFolder() {
+      return getRegionFolder(region.getFullPath());
+    }
+
+    public HoplogListener getListener() {
+      return listener;
+    }
+
+    public HDFSStoreImpl getStore() {
+      return store;
+    }
+
+    public LocalRegion getRegion() {
+      return region;
+    }
+    
+    public SortedOplogStatistics getHdfsStats() {
+      return hdfsStats;
+    }
+    
+    public Collection<HoplogOrganizer> getBucketOrganizers(){
+      return this.bucketOrganizerMap.values();
+    }
+
+    /**
+     * get the HoplogOrganizers only for the given set of buckets
+     */
+    public Collection<HoplogOrganizer> getBucketOrganizers(Set<Integer> buckets){
+      Set<HoplogOrganizer> result = new HashSet<HoplogOrganizer>();
+      for (Integer bucketId : buckets) {
+        result.add(this.bucketOrganizerMap.get(bucketId));
+      }
+      return result;
+    }
+
+    /**
+     * Delete all files from HDFS for this region. This method
+     * should be called after all members have destroyed their
+     * region in gemfire, so there should be no threads accessing
+     * these files.
+     * @throws IOException 
+     */
+    public void destroyData() throws IOException {
+      //Make sure everything is shut down and closed.
+      close();
+      if (store == null) {
+        return;
+      }
+      Path regionPath = new Path(store.getHomeDir(), getRegionFolder());
+      
+      //Delete all files in HDFS.
+      FileSystem fs = getStore().getFileSystem();
+      if(!fs.delete(regionPath, true)) {
+        if(fs.exists(regionPath)) {
+          throw new IOException("Unable to delete " + regionPath);
+        }
+      }
+    }
+
+    public void performMaintenance() throws IOException {
+      Collection<HoplogOrganizer> buckets = getBucketOrganizers();
+      for (HoplogOrganizer bucket : buckets) {
+        bucket.performMaintenance();
+      }
+    }
+  }
+  
+  private class JanitorTask implements Runnable {
+    boolean terminated = false;
+    @Override
+    public void run() {
+      if (terminated) {
+        return;
+      }
+      fineLog("Executing HDFS Region janitor task", null);
+      
+      Collection<HdfsRegionManager> regions = regionManagerMap.values();
+      for (HdfsRegionManager region : regions) {
+        fineLog("Maintaining region:" + region.getRegionFolder(), null);
+        try {
+          region.performMaintenance();
+        } catch (Throwable e) {
+          logger.info(LocalizedMessage.create(LocalizedStrings.HOPLOG_IO_ERROR , region.getRegionFolder()));
+          logger.info(LocalizedMessage.create(LocalizedStrings.ONE_ARG, e.getMessage()));
+          fineLog(null, e);
+        }
+      }
+    }
+
+    public void terminate() {
+      terminated = true;
+    }
+  }
+  
+  protected static void fineLog(String message, Throwable e) {
+    if(logger.isDebugEnabled()) {
+      logger.debug(message, e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java
new file mode 100644
index 0000000..880ef3e
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+
+/**
+ * HDFSStoreDirector is created for managing all instances of HDFSStoreImpl.    
+ *
+ */
+public final class HDFSStoreDirector {
+  private final ConcurrentHashMap<String, HDFSStoreImpl> storeMap = new ConcurrentHashMap<String, HDFSStoreImpl>();
+
+  // singleton instance
+  private static volatile HDFSStoreDirector instance;
+  
+  private HDFSStoreDirector() {
+
+  }
+  
+  public static final HDFSStoreDirector getInstance() {
+    if (instance == null) {
+      synchronized (HDFSStoreDirector.class)  {
+        if (instance == null)
+          instance = new HDFSStoreDirector();
+      }
+    }
+    return instance;
+  }
+
+  // Called when the region is created.
+  public final void addHDFSStore(HDFSStoreImpl hdfsStore){
+    this.storeMap.put(hdfsStore.getName(), hdfsStore); 
+  }
+  
+  public final HDFSStoreImpl getHDFSStore(String hdfsStoreName) {
+    return this.storeMap.get(hdfsStoreName);
+  }
+  
+  public final void removeHDFSStore(String hdfsStoreName) {
+    this.storeMap.remove(hdfsStoreName);
+  } 
+  
+  public void closeHDFSStores() {
+    Iterator<HDFSStoreImpl> it = this.storeMap.values().iterator();
+    while (it.hasNext()) {
+      HDFSStoreImpl hsi = it.next();
+      hsi.close();
+    }
+    this.storeMap.clear();
+  }
+
+   public ArrayList<HDFSStoreImpl> getAllHDFSStores() {
+    ArrayList<HDFSStoreImpl> hdfsStores = new ArrayList<HDFSStoreImpl>();
+    hdfsStores.addAll(this.storeMap.values());
+    return hdfsStores;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java
new file mode 100644
index 0000000..cbb35cb
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.UnsortedHoplogPersistedEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter;
+import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/**
+ * Manages unsorted Hoplog files for a bucket (Streaming Ingest option). An instance per bucket 
+ * will exist in each PR
+ * 
+ *
+ */
+public class HDFSUnsortedHoplogOrganizer extends AbstractHoplogOrganizer<UnsortedHoplogPersistedEvent> {
+  public static final String HOPLOG_REGEX = HOPLOG_NAME_REGEX + "("
+      + SEQ_HOPLOG_EXTENSION + "|" + TEMP_HOPLOG_EXTENSION + ")";
+  public static final Pattern HOPLOG_PATTERN = Pattern.compile(HOPLOG_REGEX);
+  protected static String TMP_FILE_NAME_REGEX = HOPLOG_NAME_REGEX + SEQ_HOPLOG_EXTENSION + TEMP_HOPLOG_EXTENSION + "$";
+  protected static final Pattern patternForTmpHoplog = Pattern.compile(TMP_FILE_NAME_REGEX);
+  
+   volatile private HoplogWriter writer;
+   volatile private Hoplog currentHoplog;
+   
+   volatile private long lastFlushTime = System.currentTimeMillis();
+   
+   volatile private boolean abortFlush = false;
+   private FileSystem fileSystem;
+   
+   public HDFSUnsortedHoplogOrganizer(HdfsRegionManager region, int bucketId) throws IOException{
+    super(region, bucketId);
+    writer = null;
+    sequence = new AtomicInteger(0);
+
+    fileSystem = store.getFileSystem();
+    if (! fileSystem.exists(bucketPath)) {
+      return;
+    }
+    
+    FileStatus validHoplogs[] = FSUtils.listStatus(fileSystem, bucketPath, new PathFilter() {
+      @Override
+      public boolean accept(Path file) {
+        // All valid hoplog files must match the regex
+        Matcher matcher = HOPLOG_PATTERN.matcher(file.getName());
+        return matcher.matches();
+      }
+    });
+
+    if (validHoplogs != null && validHoplogs.length > 0) {
+      for (FileStatus file : validHoplogs) {
+        // account for the disk used by this file
+        incrementDiskUsage(file.getLen());
+      }
+    }
+
+  }
+  
+    @Override
+    public void close() throws IOException {
+      super.close();
+      if (logger.isDebugEnabled())
+        logger.debug("{}Closing the hoplog organizer and the open files", logPrefix);
+      // abort the flush so that we can immediately call the close current writer. 
+      abortFlush = true;
+      synchronizedCloseWriter(true, 0, 0);
+    }
+    
+    
+    /**
+     * Flushes the data to HDFS. 
+     * Synchronization ensures that the writer is not closed when flush is happening.
+     * To abort the flush, abortFlush needs to be set.  
+     * @throws ForceReattemptException 
+     */
+     @Override
+    public synchronized void flush(Iterator<? extends QueuedPersistentEvent> bufferIter, final int count)
+        throws IOException, ForceReattemptException {
+      assert bufferIter != null;
+      
+      if (abortFlush)
+        throw new CacheClosedException("Either the region has been cleared " +
+            "or closed. Aborting the ongoing flush operation.");
+      if (logger.isDebugEnabled())
+        logger.debug("{}Initializing flush operation", logPrefix);
+      
+      // variables for updating stats
+      long start = stats.getFlush().begin();
+      int byteCount = 0;
+      if (writer == null) {
+        // Hoplogs of sequence files are always created with a 0 sequence number
+        currentHoplog = getTmpSortedOplog(0, SEQ_HOPLOG_EXTENSION);
+        try {
+          writer = this.store.getSingletonWriter().runSerially(new Callable<Hoplog.HoplogWriter>() {
+            @Override
+            public HoplogWriter call() throws Exception {
+              return currentHoplog.createWriter(count);
+            }
+          });
+        } catch (Exception e) {
+          if (e instanceof IOException) {
+            throw (IOException)e;
+          }
+          throw new IOException(e);
+        }
+      }
+      long timeSinceLastFlush = (System.currentTimeMillis() - lastFlushTime)/1000 ;
+      
+      try {
+        /**MergeGemXDHDFSToGFE changed the following statement as the code of HeapDataOutputStream is not merged */
+        //HeapDataOutputStream out = new HeapDataOutputStream();
+        while (bufferIter.hasNext()) {
+          HeapDataOutputStream out = new HeapDataOutputStream(1024, null);
+          if (abortFlush) {
+            stats.getFlush().end(byteCount, start);
+            throw new CacheClosedException("Either the region has been cleared " +
+            		"or closed. Aborting the ongoing flush operation.");
+          }
+          QueuedPersistentEvent item = bufferIter.next();
+          item.toHoplogEventBytes(out);
+          byte[] valueBytes = out.toByteArray();
+          writer.append(item.getRawKey(), valueBytes);
+          // add key length and value length to stats byte counter
+          byteCount += (item.getRawKey().length + valueBytes.length);
+          /**MergeGemXDHDFSToGFE how to clear for reuse. Leaving it for Darrel to merge this change*/
+          //out.clearForReuse();
+        }
+        // ping secondaries before making the file a legitimate file to ensure 
+        // that in case of split brain, no other vm has taken up as primary. #50110. 
+        if (!abortFlush)
+          pingSecondaries();
+        // append completed. If the file is to be rolled over, 
+        // close writer and rename the file to a legitimate name.
+        // Else, sync the already written data with HDFS nodes. 
+        int maxFileSize = this.store.getWriteOnlyFileRolloverSize() * 1024 * 1024;  
+        int fileRolloverInterval = this.store.getWriteOnlyFileRolloverInterval(); 
+        if (writer.getCurrentSize() >= maxFileSize || 
+            timeSinceLastFlush >= fileRolloverInterval) {
+          closeCurrentWriter();
+        }
+        else {
+          // if flush is not aborted, hsync the batch. It ensures that 
+          // the batch has reached HDFS and we can discard it. 
+          if (!abortFlush)
+            writer.hsync();
+        }
+      } catch (IOException e) {
+        stats.getFlush().error(start);
+        // as there is an exception, it can be probably be a file specific problem.
+        // close the current file to avoid any file specific issues next time  
+        closeCurrentWriter();
+        // throw the exception so that async queue will dispatch the same batch again 
+        throw e;
+      } 
+      
+      stats.getFlush().end(byteCount, start);
+    }
+    
+    /**
+     * Synchronization ensures that the writer is not closed when flush is happening. 
+     */
+    synchronized void synchronizedCloseWriter(boolean forceClose, 
+        long timeSinceLastFlush, int minsizeforrollover) throws IOException { 
+      long writerSize = 0;
+      if (writer != null){
+        writerSize = writer.getCurrentSize();
+      }
+      
+      if (writerSize < (minsizeforrollover * 1024L))
+        return;
+      
+      int maxFileSize = this.store.getWriteOnlyFileRolloverSize() * 1024 * 1024;  
+      int fileRolloverInterval = this.store.getWriteOnlyFileRolloverInterval(); 
+      if (writerSize >= maxFileSize || 
+          timeSinceLastFlush >= fileRolloverInterval || forceClose) {
+        closeCurrentWriter();
+      }
+      }
+        
+    
+    /**
+     * Closes the current writer so that next time a new hoplog can 
+     * be created. Also, fixes any tmp hoplogs. 
+     * 
+     * @throws IOException
+     */
+    void closeCurrentWriter() throws IOException {
+      
+      if (writer != null) {
+        // If this organizer is closing, it is ok to ignore exceptions here
+        // because CloseTmpHoplogsTimerTask
+        // on another member may have already renamed the hoplog
+        // fixes bug 49141
+        boolean isClosing = abortFlush;
+        try {
+          incrementDiskUsage(writer.getCurrentSize());
+        } catch (IOException e) {
+          if (!isClosing) {
+            throw e;
+          }
+        }
+        if (logger.isDebugEnabled())
+          logger.debug("{}Closing hoplog " + currentHoplog.getFileName(), logPrefix);
+        try{
+          writer.close();
+          makeLegitimate(currentHoplog);
+        } catch (IOException e) {
+          if (!isClosing) {
+            logger.warn(LocalizedStrings.HOPLOG_FLUSH_OPERATION_FAILED, e);
+            throw e;
+          }
+        } finally {
+          writer = null;
+          lastFlushTime = System.currentTimeMillis();
+        }
+      }
+      else
+        lastFlushTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public void clear() throws IOException {
+      boolean prevAbortFlushFlag = abortFlush;
+      // abort the flush so that we can immediately call the close current writer. 
+      abortFlush = true;
+      
+      // Close if there is any existing writer. 
+      try {
+        synchronizedCloseWriter(true, 0, 0);
+      } catch (IOException e) {
+        logger.warn(LocalizedStrings.HOPLOG_CLOSE_FAILED, e);
+      }
+      
+      // reenable the aborted flush
+      abortFlush = prevAbortFlushFlag;
+      
+      // Mark the hoplogs for deletion
+      markHoplogsForDeletion();
+      
+    }
+  
+    @Override
+    public void performMaintenance() {
+      // TODO remove the timer for tmp file conversion. Use this instead
+    }
+
+    @Override
+    public Future<CompactionStatus> forceCompaction(boolean isMajor) {
+      return null;
+    }
+
+    @Override
+    protected Hoplog getHoplog(Path hoplogPath) throws IOException {
+      Hoplog so = new SequenceFileHoplog(fileSystem, hoplogPath, stats);
+      return so;
+    }
+  
+  /**
+   * Fixes the size of hoplogs that were not closed properly last time. 
+   * Such hoplogs are *.tmphop files. Identify them and open them and close 
+   * them, this fixes the size. After doing this rename them to *.hop. 
+   * 
+   * @throws IOException
+   * @throws ForceReattemptException 
+   */
+  void identifyAndFixTmpHoplogs(FileSystem fs) throws IOException, ForceReattemptException {
+    if (logger.isDebugEnabled())
+      logger.debug("{}Fixing temporary hoplogs", logPrefix);
+    
+    // A different filesystem is passed to this function for the following reason: 
+    // For HDFS, if a file wasn't closed properly last time, 
+    // while calling FileSystem.append for this file, FSNamesystem.startFileInternal->
+    // FSNamesystem.recoverLeaseInternal function gets called. 
+    // This function throws AlreadyBeingCreatedException if there is an open handle, to any other file, 
+    // created using the same FileSystem object. This is a bug and is being tracked at: 
+    // https://issues.apache.org/jira/browse/HDFS-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
+    // 
+    // The fix for this bug is not yet part of Pivotal HD. So to overcome the bug, 
+    // we create a new file system for the timer task so that it does not encounter the bug. 
+    
+    FileStatus tmpHoplogs[] = FSUtils.listStatus(fs, fs.makeQualified(bucketPath), new PathFilter() {
+      @Override
+      public boolean accept(Path file) {
+        // All valid hoplog files must match the regex
+        Matcher matcher = patternForTmpHoplog.matcher(file.getName());
+        return matcher.matches();
+      }
+    });
+    
+    if (tmpHoplogs == null || tmpHoplogs.length == 0) {
+      if (logger.isDebugEnabled())
+        logger.debug("{}No files to fix", logPrefix);
+      return;
+    }
+    // ping secondaries so that in case of split brain, no other vm has taken up 
+    // as primary. #50110. 
+    pingSecondaries();
+    if (logger.isDebugEnabled())
+      logger.debug("{}Files to fix " + tmpHoplogs.length, logPrefix);
+
+    String currentHoplogName = null;
+    // get the current hoplog name. We need to ignore current hoplog while fixing. 
+    if (currentHoplog != null) {
+      currentHoplogName = currentHoplog.getFileName();
+    }
+    
+    for (int i = 0; i < tmpHoplogs.length; i++) {
+      // Skip directories
+      if (tmpHoplogs[i].isDirectory()) {
+        continue;
+      }
+
+      final Path p = tmpHoplogs[i].getPath();
+      
+      if (tmpHoplogs[i].getPath().getName().equals(currentHoplogName)){
+        if (logger.isDebugEnabled())
+          logger.debug("Skipping current file: " + tmpHoplogs[i].getPath().getName(), logPrefix);
+        continue;
+      } 
+      
+      SequenceFileHoplog hoplog = new SequenceFileHoplog(fs, p, stats);
+      try {
+        makeLegitimate(hoplog);
+        logger.info (LocalizedMessage.create(LocalizedStrings.DEBUG, "Hoplog " + p + " was a temporary " +
+            "hoplog because the node managing it wasn't shutdown properly last time. Fixed the hoplog name."));
+      } catch (IOException e) {
+        logger.info (LocalizedMessage.create(LocalizedStrings.DEBUG, "Hoplog " + p + " is still a temporary " +
+            "hoplog because the node managing it wasn't shutdown properly last time. Failed to " +
+            "change the hoplog name because an exception was thrown while fixing it. " + e));
+      }
+    }
+  }
+  
+  private FileStatus[] getExpiredHoplogs() throws IOException {
+    FileStatus files[] = FSUtils.listStatus(fileSystem, bucketPath, new PathFilter() {
+      @Override
+      public boolean accept(Path file) {
+        // All expired hoplog end with expire extension and must match the valid file regex
+        String fileName = file.getName();
+        if (! fileName.endsWith(EXPIRED_HOPLOG_EXTENSION)) {
+          return false;
+        }
+        return true;
+      }
+    });
+    return files;
+  }
+  /**
+   * locks sorted oplogs collection, removes oplog and renames for deletion later
+   * @throws IOException 
+   */
+  private void markHoplogsForDeletion() throws IOException {
+    
+    ArrayList<IOException> errors = new ArrayList<IOException>();
+    FileStatus validHoplogs[] = FSUtils.listStatus(fileSystem, bucketPath, new PathFilter() {
+      @Override
+      public boolean accept(Path file) {
+        // All valid hoplog files must match the regex
+        Matcher matcher = HOPLOG_PATTERN.matcher(file.getName());
+        return matcher.matches();
+      }
+    });
+    
+    FileStatus[] expired = getExpiredHoplogs();
+    validHoplogs = filterValidHoplogs(validHoplogs, expired);
+
+    if (validHoplogs == null || validHoplogs.length == 0) {
+      return;
+    }
+    for (FileStatus fileStatus : validHoplogs) {
+      try {
+        addExpiryMarkerForAFile(getHoplog(fileStatus.getPath()));
+      } catch (IOException e) {
+        // even if there is an IO error continue removing other hoplogs and
+        // notify at the end
+        errors.add(e);
+      }
+    }
+    
+    if (!errors.isEmpty()) {
+      for (IOException e : errors) {
+        logger.warn(LocalizedStrings.HOPLOG_HOPLOG_REMOVE_FAILED, e);
+      }
+    }
+  }
+  
+  @Override
+  public Compactor getCompactor() {
+    throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName());
+  }
+  
+    @Override
+  public HoplogIterator<byte[], UnsortedHoplogPersistedEvent> scan(
+      long startOffset, long length) throws IOException {
+    throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName());
+    }
+
+  public long getLastFlushTime() {
+    return this.lastFlushTime;
+      }
+  
+  public long getfileRolloverInterval(){
+    int fileRolloverInterval = this.store.getWriteOnlyFileRolloverInterval(); 
+    return fileRolloverInterval;
+    }
+
+  @Override
+  public long getLastMajorCompactionTimestamp() {
+    throw new UnsupportedOperationException();
+  }
+
+}