You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:50:08 UTC

[22/25] incubator-geode git commit: GEODE-10: Reinstating HDFS persistence code

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java
new file mode 100644
index 0000000..82e2bf9
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.cache.CachedDeserializable;
+import com.gemstone.gemfire.internal.cache.CachedDeserializableFactory;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.lru.Sizeable;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * Event that is persisted in HDFS. As we need to persist some of the EntryEventImpl
+ * variables, we have created this class and have overridden toData and fromData functions.  
+ * 
+ *  There are subclasses of this class of the different types of persisted events
+ *  sorted vs. unsorted, and the persisted events we keep in the region
+ *  queue, which need to hold the region key.
+ *   
+ *
+ */
+public abstract class PersistedEventImpl {
+  protected Operation op = Operation.UPDATE;
+  
+  protected Object valueObject;
+
+  /**
+   * A field with flags decribing the event
+   */
+  protected byte flags;
+
+   //FLags indicating the type of value
+   //if the value is not a byte array or object, is is an internal delta.
+  private static final byte VALUE_IS_BYTE_ARRAY= 0x01;
+  private static final byte VALUE_IS_OBJECT= (VALUE_IS_BYTE_ARRAY << 1);
+  private static final byte POSSIBLE_DUPLICATE = (VALUE_IS_OBJECT << 1);
+  private static final byte HAS_VERSION_TAG = (POSSIBLE_DUPLICATE << 1);
+  
+
+  /** for deserialization */
+  public PersistedEventImpl() {
+  }
+  
+  public PersistedEventImpl(Object value, Operation op, byte valueIsObject,
+      boolean isPossibleDuplicate, boolean hasVersionTag) throws IOException,
+      ClassNotFoundException {
+    this.op = op;
+    this.valueObject = value;
+    setFlag(VALUE_IS_BYTE_ARRAY, valueIsObject == 0x00);
+    setFlag(VALUE_IS_OBJECT, valueIsObject == 0x01);
+    setFlag(POSSIBLE_DUPLICATE, isPossibleDuplicate);
+    setFlag(HAS_VERSION_TAG, hasVersionTag);
+  }
+  
+  private void setFlag(byte flag, boolean set) {
+    flags = (byte) (set ?  flags | flag :  flags & ~flag);
+  }
+  
+  private boolean getFlag(byte flag) {
+    return (flags & flag) != 0x0;
+  }
+
+  public void toData(DataOutput out) throws IOException {
+    out.writeByte(this.op.ordinal);
+    out.writeByte(this.flags);
+    
+    if (getFlag(VALUE_IS_BYTE_ARRAY)) { 
+      DataSerializer.writeByteArray((byte[])this.valueObject, out);
+    } else if (getFlag(VALUE_IS_OBJECT)) {
+      if(valueObject instanceof CachedDeserializable) {
+        CachedDeserializable cd = (CachedDeserializable)valueObject;
+        DataSerializer.writeObjectAsByteArray(cd.getValue(), out);
+      } else {
+        DataSerializer.writeObjectAsByteArray(valueObject, out);
+      }
+    }
+    else {
+      DataSerializer.writeObject(valueObject, out);
+    }
+  }
+
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    this.op = Operation.fromOrdinal(in.readByte());
+    this.flags = in.readByte();
+    
+    if (getFlag(VALUE_IS_BYTE_ARRAY)) { 
+      this.valueObject = DataSerializer.readByteArray(in);
+    } else if (getFlag(VALUE_IS_OBJECT)) {
+      byte[] newValueBytes = DataSerializer.readByteArray(in);
+      if(newValueBytes == null) {
+        this.valueObject = null;
+      } else {
+        if(CachedDeserializableFactory.preferObject()) {
+          this.valueObject =  EntryEventImpl.deserialize(newValueBytes);
+        } else {
+          this.valueObject = CachedDeserializableFactory.create(newValueBytes);
+        }
+      }
+    }
+    else {
+      this.valueObject = DataSerializer.readObject(in);
+    }
+    
+  }
+  
+  /**
+   * Return the timestamp of this event. Depending on the subclass,
+   * this may be part of the version tag, or a separate field.
+   */
+  public abstract long getTimstamp();
+
+  protected boolean hasVersionTag() {
+    return getFlag(HAS_VERSION_TAG);
+  }
+
+  public Operation getOperation()
+  {
+    return this.op;
+  }
+  
+  public Object getValue() {
+    return this.valueObject;
+  }
+  
+  public boolean isPossibleDuplicate()
+  {
+    return getFlag(POSSIBLE_DUPLICATE);
+  }
+
+  /**
+   * returns deserialized value. 
+   * 
+   */
+  public Object getDeserializedValue() throws IOException, ClassNotFoundException {
+    Object retVal = null;
+    if (getFlag(VALUE_IS_BYTE_ARRAY)) { 
+      // value is a byte array
+      retVal = this.valueObject;
+    } else if (getFlag(VALUE_IS_OBJECT)) {
+      if(valueObject instanceof CachedDeserializable) {
+        retVal = ((CachedDeserializable)valueObject).getDeserializedForReading();
+      } else {
+        retVal = valueObject;
+      }
+    }
+    else {
+      // value is a object
+      retVal = this.valueObject;
+    }
+    return retVal;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder str = new StringBuilder(PersistedEventImpl.class.getSimpleName());
+    str.append("@").append(System.identityHashCode(this))
+    .append(" op:").append(op)
+    .append(" valueObject:").append(valueObject)
+    .append(" isPossibleDuplicate:").append(getFlag(POSSIBLE_DUPLICATE));
+    return str.toString();
+  }
+
+  public void copy(PersistedEventImpl usersValue) {
+    this.op = usersValue.op;
+    this.valueObject = usersValue.valueObject;
+    this.flags = usersValue.flags;
+  }
+  
+  public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
+    int size = 0;
+    
+    // value length
+    size += valueSize; 
+
+    // one byte for op and one byte for flag
+    size += 2;
+    
+    return size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java
new file mode 100644
index 0000000..bd7994c
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public interface QueuedPersistentEvent {
+  
+  public byte[] getRawKey();
+  
+  public void toHoplogEventBytes(DataOutput out) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java
new file mode 100644
index 0000000..b97bdb7
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Tracks flushes using a queue of latches.
+ * 
+ */
+public class SignalledFlushObserver implements FlushObserver {
+  private static class FlushLatch extends CountDownLatch {
+    private final long seqnum;
+    
+    public FlushLatch(long seqnum) {
+      super(1);
+      this.seqnum = seqnum;
+    }
+    
+    public long getSequence() {
+      return seqnum;
+    }
+  }
+  
+  // assume the number of outstanding flush requests is small so we don't
+  // need to organize by seqnum
+  private final List<FlushLatch> signals;
+  
+  private final AtomicLong eventsReceived;
+  private final AtomicLong eventsDelivered;
+  
+  public SignalledFlushObserver() {
+    signals = new ArrayList<FlushLatch>();
+    eventsReceived = new AtomicLong(0);
+    eventsDelivered = new AtomicLong(0);
+  }
+  
+  @Override
+  public boolean shouldDrainImmediately() {
+    synchronized (signals) {
+      return !signals.isEmpty();
+    }
+  }
+  
+  @Override
+  public AsyncFlushResult flush() {
+    final long seqnum = eventsReceived.get();
+    synchronized (signals) {
+      final FlushLatch flush;
+      if (seqnum <= eventsDelivered.get()) {
+        flush = null;
+      } else {
+        flush = new FlushLatch(seqnum);
+        signals.add(flush);
+      }
+      
+      return new AsyncFlushResult() {
+        @Override
+        public boolean waitForFlush(long timeout, TimeUnit unit) throws InterruptedException {
+          return flush == null ? true : flush.await(timeout, unit);
+        }
+      };
+    }
+  }
+
+  /**
+   * Invoked when an event is received.
+   */
+  public void push() {
+    eventsReceived.incrementAndGet();
+  }
+
+  /**
+   * Invoked when a batch has been dispatched.
+   */
+  public void pop(int count) {
+    long highmark = eventsDelivered.addAndGet(count);
+    synchronized (signals) {
+      for (ListIterator<FlushLatch> iter = signals.listIterator(); iter.hasNext(); ) {
+        FlushLatch flush = iter.next();
+        if (flush.getSequence() <= highmark) {
+          flush.countDown();
+          iter.remove();
+        }
+      }
+    }
+  }
+  
+  /**
+   * Invoked when the queue is cleared.
+   */
+  public void clear() {
+    synchronized (signals) {
+      for (FlushLatch flush : signals) {
+        flush.countDown();
+      }
+
+      signals.clear();
+      eventsReceived.set(0);
+      eventsDelivered.set(0);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java
new file mode 100644
index 0000000..c725ce5
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * A persistent event that is stored in the hoplog queue. This class is only used
+ * temporarily to copy the data from the HDFSGatewayEventImpl to the persisted
+ * record in the file.
+ * 
+ *
+ */
+public class SortedHDFSQueuePersistedEvent extends SortedHoplogPersistedEvent implements QueuedPersistentEvent {
+  
+  
+  /**key stored in serialized form*/
+  protected byte[] keyBytes = null;
+  
+  public SortedHDFSQueuePersistedEvent(HDFSGatewayEventImpl in) throws IOException,
+  ClassNotFoundException {
+    this(in.getSerializedValue(), in.getOperation(), in.getValueIsObject(), in
+        .getPossibleDuplicate(), in.getVersionTag(), in.getSerializedKey(), in
+        .getCreationTime());
+  }
+
+  public SortedHDFSQueuePersistedEvent(Object valueObject, Operation operation,
+      byte valueIsObject, boolean possibleDuplicate, VersionTag versionTag,
+      byte[] serializedKey, long timestamp) throws ClassNotFoundException, IOException {
+    super(valueObject, operation, valueIsObject, possibleDuplicate, versionTag, timestamp);
+    this.keyBytes = serializedKey;
+    // TODO Auto-generated constructor stub
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    super.toData(out);
+    DataSerializer.writeByteArray(this.keyBytes, out);
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    super.fromData(in);
+    this.keyBytes = DataSerializer.readByteArray(in);
+  }
+
+  @Override
+  public void toHoplogEventBytes(DataOutput out) throws IOException {
+    super.toData(out);
+  }
+
+  public byte[] getRawKey() {
+    return this.keyBytes;
+  }
+  public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
+    
+    int size = SortedHoplogPersistedEvent.getSizeInBytes(keySize, valueSize, versionTag);
+    
+    size += keySize;
+    
+    return size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java
new file mode 100644
index 0000000..e8be7b8
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.internal.ByteArrayDataInput;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+
+/**
+ * A persistent event that is stored in a sorted hoplog. In addition
+ * to the fields of PersistentEventImpl, this event has a version tag.
+ * 
+ * This class should only be serialized by directly calling toData,
+ * which is why it does not implement DataSerializable
+ * 
+ */
+public class SortedHoplogPersistedEvent extends PersistedEventImpl {
+  /** version tag for concurrency checks */
+  protected VersionTag versionTag;
+
+  /** timestamp of the event. Used when version checks are disabled*/
+  protected long timestamp;
+
+  public SortedHoplogPersistedEvent(Object valueObject, Operation operation,
+      byte valueIsObject, boolean possibleDuplicate, VersionTag tag, long timestamp) throws ClassNotFoundException, IOException {
+    super(valueObject, operation, valueIsObject, possibleDuplicate, tag != null);
+    this.versionTag = tag;
+    this.timestamp = timestamp;
+  }
+
+  public SortedHoplogPersistedEvent() {
+    //for deserialization
+  }
+
+  @Override
+  public long getTimstamp() {
+    return versionTag == null ? timestamp : versionTag.getVersionTimeStamp();
+  }
+  
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    super.toData(out);
+    if (versionTag == null) {
+      out.writeLong(timestamp);
+    } else {
+      //TODO optimize these
+      DataSerializer.writeObject(this.versionTag, out);
+    }
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    super.fromData(in);
+    if (hasVersionTag()) {
+      this.versionTag = (VersionTag)DataSerializer.readObject(in);
+    } else {
+      this.timestamp = in.readLong();
+    }
+  }
+  
+  /**
+   * @return the concurrency versioning tag for this event, if any
+   */
+  public VersionTag getVersionTag() {
+    return this.versionTag;
+  }
+  
+  public static SortedHoplogPersistedEvent fromBytes(byte[] val)
+      throws IOException, ClassNotFoundException {
+    ByteArrayDataInput in = new ByteArrayDataInput();
+    in.initialize(val, null);
+    SortedHoplogPersistedEvent event = new SortedHoplogPersistedEvent();
+    event.fromData(in);
+    return event;
+  }
+  
+  public void copy(PersistedEventImpl usersValue) {
+    super.copy(usersValue);
+    this.versionTag = ((SortedHoplogPersistedEvent) usersValue).versionTag;
+    this.timestamp = ((SortedHoplogPersistedEvent) usersValue).timestamp;
+  }
+  
+  public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
+    int size = PersistedEventImpl.getSizeInBytes(keySize, valueSize, versionTag);
+    
+    if (versionTag != null) {
+      size +=  versionTag.getSizeInBytes();
+    } else {
+      // size of Timestamp
+      size += 8;
+    }
+    
+    return size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java
new file mode 100644
index 0000000..93d596b
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+
+
+/**
+ * A persistent event that is stored in the hoplog queue. This class is only used
+ * temporarily to copy the data from the HDFSGatewayEventImpl to the persisted
+ * record in the file. 
+ * 
+ *
+ */
+public class UnsortedHDFSQueuePersistedEvent extends UnsortedHoplogPersistedEvent implements QueuedPersistentEvent {
+  
+  /**the bytes of the key for this entry */
+  protected byte[] keyBytes = null;
+  
+  public UnsortedHDFSQueuePersistedEvent(HDFSGatewayEventImpl in) throws IOException,
+  ClassNotFoundException {
+    super(in.getValue(), in.getOperation(), in.getValueIsObject(), in.getPossibleDuplicate(), 
+        in.getVersionTimeStamp() == 0 ? in.getCreationTime() : in.getVersionTimeStamp());
+    this.keyBytes = in.getSerializedKey();
+    
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    super.toData(out);
+    DataSerializer.writeByteArray(this.keyBytes, out);
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    super.fromData(in);
+    this.keyBytes = DataSerializer.readByteArray(in);
+  }
+  
+  @Override
+  public void toHoplogEventBytes(DataOutput out) throws IOException {
+    super.toData(out);
+  }
+  
+  public byte[] getRawKey() {
+    return this.keyBytes;
+  }
+  
+  public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
+    
+    int size = UnsortedHoplogPersistedEvent.getSizeInBytes(keySize, valueSize, versionTag);
+    
+    size += keySize;
+    
+    return size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java
new file mode 100644
index 0000000..9b9a04d
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.internal.ByteArrayDataInput;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+
+/**
+ * A persisted event that is sorted in an unsorted (sequential hoplog). This
+ * does not have a version stamp, but just a timestamp for the entry.
+ * 
+ * This class should only be serialized by calling toData directly, which
+ * is why it does not implement DataSerializable.
+ * 
+ *
+ */
+public class UnsortedHoplogPersistedEvent extends PersistedEventImpl {
+  long timestamp;
+  
+  
+
+  public UnsortedHoplogPersistedEvent() {
+    //for deserialization
+  }
+
+  public UnsortedHoplogPersistedEvent(Object value, Operation op,
+      byte valueIsObject, boolean isPossibleDuplicate, long timestamp) throws IOException,
+      ClassNotFoundException {
+    super(value, op, valueIsObject, isPossibleDuplicate, false/*hasVersionTag*/);
+    this.timestamp = timestamp;
+  }
+
+  @Override
+  public long getTimstamp() {
+    return timestamp;
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    super.toData(out);
+    DataSerializer.writeLong(timestamp, out);
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    super.fromData(in);
+    this.timestamp = DataSerializer.readLong(in);
+  }
+  
+  public static UnsortedHoplogPersistedEvent fromBytes(byte[] val)
+      throws IOException, ClassNotFoundException {
+    ByteArrayDataInput in = new ByteArrayDataInput();
+    in.initialize(val, null);
+    UnsortedHoplogPersistedEvent event = new UnsortedHoplogPersistedEvent();
+    event.fromData(in);
+    return event;
+  }
+  
+  public void copy(PersistedEventImpl usersValue) {
+    super.copy(usersValue);
+    this.timestamp = ((UnsortedHoplogPersistedEvent) usersValue).timestamp;
+  }
+  
+  public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
+    int size = PersistedEventImpl.getSizeInBytes(keySize, valueSize, versionTag);
+    
+    // size of Timestamp
+    size += 8;
+    
+    return size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
new file mode 100644
index 0000000..d2fdbe7
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.util.regex.Matcher;
+
+import com.gemstone.gemfire.internal.hll.ICardinality;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile;
+import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.CompressionType;
+import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Abstract class for {@link Hoplog} with common functionality
+ */
+public abstract class AbstractHoplog implements Hoplog {
+  protected final FSProvider fsProvider;
+  
+  // path of the oplog file
+  protected volatile Path path;
+  private volatile HoplogDescriptor hfd;
+  protected Configuration conf;
+  protected SortedOplogStatistics stats;
+  protected Long hoplogModificationTime;
+  protected Long hoplogSize;
+
+  protected HoplogReaderActivityListener readerListener;
+  
+  // logger instance
+  protected static final Logger logger = LogService.getLogger();
+  
+  protected static String logPrefix;
+  // THIS CONSTRUCTOR SHOULD BE USED FOR LONER ONLY
+  AbstractHoplog(FileSystem inputFS, Path filePath, SortedOplogStatistics stats)
+      throws IOException {
+    logPrefix = "<" + filePath.getName() + "> ";
+    this.fsProvider = new FSProvider(inputFS);
+    initialize(filePath, stats, inputFS);
+  }
+
+  public AbstractHoplog(HDFSStoreImpl store, Path filePath,
+      SortedOplogStatistics stats) throws IOException {
+    logPrefix = "<" + filePath.getName() + "> ";
+    this.fsProvider = new FSProvider(store);
+    initialize(filePath, stats, store.getFileSystem());
+  }
+
+  private void initialize(Path path, SortedOplogStatistics stats, FileSystem fs) {
+    this.conf = fs.getConf();
+    this.stats = stats;
+    this.path = fs.makeQualified(path);
+    this.hfd = new HoplogDescriptor(this.path.getName());
+  }
+  
+  @Override
+  public abstract void close() throws IOException; 
+  @Override
+  public abstract HoplogReader getReader() throws IOException;
+
+  @Override
+  public abstract HoplogWriter createWriter(int keys) throws IOException;
+
+  @Override
+  abstract public void close(boolean clearCache) throws IOException;
+
+  @Override
+  public void setReaderActivityListener(HoplogReaderActivityListener listener) {
+    this.readerListener = listener;
+  }
+  
+  @Override
+  public String getFileName() {
+    return this.hfd.getFileName();
+  }
+  
+  public final int compareTo(Hoplog o) {
+    return hfd.compareTo( ((AbstractHoplog)o).hfd);
+  }
+
+  @Override
+  public ICardinality getEntryCountEstimate() throws IOException {
+    return null;
+  }
+  
+  @Override
+  public synchronized void rename(String name) throws IOException {
+    if (logger.isDebugEnabled())
+      logger.debug("{}Renaming hoplog to " + name, logPrefix);
+    Path parent = path.getParent();
+    Path newPath = new Path(parent, name);
+    fsProvider.getFS().rename(path, new Path(parent, newPath));
+
+    // close the old reader and let the new one get created lazily
+    close();
+    
+    // update path to point to the new path
+    path = newPath;
+    this.hfd = new HoplogDescriptor(this.path.getName());
+    logPrefix = "<" + path.getName() + "> ";
+  }
+  
+  @Override
+  public synchronized void delete() throws IOException {
+    if (logger.isDebugEnabled())
+      logger.debug("{}Deleting hoplog", logPrefix);
+    close();
+    this.hoplogModificationTime = null;
+    this.hoplogSize = null;
+    fsProvider.getFS().delete(path, false);
+  }
+
+  @Override
+  public long getModificationTimeStamp() {
+    initHoplogSizeTimeInfo();
+
+    // modification time will not be null if this hoplog is existing. Otherwise
+    // invocation of this method should is invalid
+    if (hoplogModificationTime == null) {
+      throw new IllegalStateException();
+    }
+    
+    return hoplogModificationTime;
+  }
+
+  @Override
+  public long getSize() {
+    initHoplogSizeTimeInfo();
+    
+    // size will not be null if this hoplog is existing. Otherwise
+    // invocation of this method should is invalid
+    if (hoplogSize == null) {
+      throw new IllegalStateException();
+    }
+    
+    return hoplogSize;
+  }
+  
+  private synchronized void initHoplogSizeTimeInfo() {
+    if (hoplogSize != null && hoplogModificationTime != null) {
+      // time and size info is already initialized. no work needed here
+      return;
+    }
+
+    try {
+      FileStatus[] filesInfo = FSUtils.listStatus(fsProvider.getFS(), path, null);
+      if (filesInfo != null && filesInfo.length == 1) {
+        this.hoplogModificationTime = filesInfo[0].getModificationTime();
+        this.hoplogSize = filesInfo[0].getLen();
+      }
+      // TODO else condition may happen if user deletes hoplog from the file system.
+    } catch (IOException e) {
+      logger.error(LocalizedMessage.create(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE, path), e);
+      throw new HDFSIOException(
+          LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(path),e);
+    }
+  }
+  public static SequenceFile.Writer getSequenceFileWriter(Path path, 
+      Configuration conf, Logger logger) throws IOException {
+    return getSequenceFileWriter(path,conf, logger, null); 
+  }
+  
+  /**
+   * 
+   * @param path
+   * @param conf
+   * @param logger
+   * @param version - is being used only for testing. Should be passed as null for other purposes. 
+   * @return SequenceFile.Writer 
+   * @throws IOException
+   */
+  public static SequenceFile.Writer getSequenceFileWriter(Path path, 
+    Configuration conf, Logger logger, Version version) throws IOException {
+    Option optPath = SequenceFile.Writer.file(path);
+    Option optKey = SequenceFile.Writer.keyClass(BytesWritable.class);
+    Option optVal = SequenceFile.Writer.valueClass(BytesWritable.class);
+    Option optCom = withCompression(logger);
+    if (logger.isDebugEnabled())
+      logger.debug("{}Started creating hoplog " + path, logPrefix);
+    
+    if (version == null)
+      version = Version.CURRENT;
+    //Create a metadata option with the gemfire version, for future versioning
+    //of the key and value format
+    SequenceFile.Metadata metadata = new SequenceFile.Metadata();
+    metadata.set(new Text(Meta.GEMFIRE_VERSION.name()), new Text(String.valueOf(version.ordinal())));
+    Option optMeta = SequenceFile.Writer.metadata(metadata);
+    
+    SequenceFile.Writer writer = SequenceFile.createWriter(conf, optPath, optKey, optVal, optCom, optMeta);
+    
+    return writer;
+  }
+  
+  private static Option withCompression(Logger logger) {
+    String prop = System.getProperty(HoplogConfig.COMPRESSION);
+    if (prop != null) {
+      CompressionCodec codec;
+      if (prop.equalsIgnoreCase("SNAPPY")) {
+        codec = new SnappyCodec();
+      } else if (prop.equalsIgnoreCase("LZ4")) {
+        codec = new Lz4Codec();
+      } else if (prop.equals("GZ")) {
+        codec = new GzipCodec();
+      } else {
+        throw new IllegalStateException("Unsupported codec: " + prop);
+      }
+      if (logger.isDebugEnabled())
+        logger.debug("{}Using compression codec " + codec, logPrefix);
+      return SequenceFile.Writer.compression(CompressionType.BLOCK, codec);
+    }
+    return SequenceFile.Writer.compression(CompressionType.NONE, null);
+  }
+  
+  public static final class HoplogDescriptor implements Comparable<HoplogDescriptor> {
+     private final String fileName;
+     private final String bucket;
+     private final int sequence;
+     private final long timestamp;
+     private final String extension;
+     
+     HoplogDescriptor(final String fileName) {
+       this.fileName = fileName;
+       final Matcher matcher = AbstractHoplogOrganizer.HOPLOG_NAME_PATTERN.matcher(fileName);
+       final boolean matched = matcher.find();
+       assert matched;
+       this.bucket = matcher.group(1);
+       this.sequence = Integer.valueOf(matcher.group(3));
+       this.timestamp = Long.valueOf(matcher.group(2)); 
+       this.extension = matcher.group(4);
+     }
+     
+     public final String getFileName() {
+       return fileName;
+     }
+     
+     @Override
+     public boolean equals(Object o) {
+       if (this == o) {
+         return true;
+       }
+       
+       if (!(o instanceof HoplogDescriptor)) {
+         return false;
+       }
+       
+       final HoplogDescriptor other = (HoplogDescriptor)o;
+       // the two files should belong to same bucket
+       assert this.bucket.equals(other.bucket);
+       
+       // compare sequence first
+       if (this.sequence != other.sequence) {
+         return false;
+       }
+       
+       // sequence is same, compare timestamps
+       if (this.timestamp != other.timestamp) {
+         return false;
+       }
+       
+       return extension.equals(other.extension);
+     }
+
+    @Override
+    public int compareTo(HoplogDescriptor o) {
+      if (this == o) {
+        return 0;
+      }
+      
+      // the two files should belong to same bucket
+      assert this.bucket.equals(o.bucket);
+      
+      // compare sequence first
+      if (sequence > o.sequence) {
+        return -1;
+      } else if (sequence < o.sequence) {
+        return 1;
+      }
+      
+      // sequence is same, compare timestamps
+      if(timestamp > o.timestamp) {
+        return -1; 
+      } else if (timestamp < o.timestamp) {
+        return 1;
+      }
+      
+      //timestamp is the same, compare the file extension. It's
+      //possible a major compaction and minor compaction could finish
+      //at the same time and create the same timestamp and sequence number
+      //it doesn't matter which file we look at first in that case.
+      return extension.compareTo(o.extension);
+    }
+     
+     
+  }
+  
+  protected static final class FSProvider {
+    final FileSystem fs;
+    final HDFSStoreImpl store;
+    
+    // THIS METHOD IS FOR TESTING ONLY
+    FSProvider(FileSystem fs) {
+      this.fs = fs;
+      this.store = null;
+    }
+    
+    FSProvider(HDFSStoreImpl store) {
+      this.store = store;
+      fs = null;
+    }
+    
+    public FileSystem getFS() throws IOException {
+      if (store != null) {
+        return store.getFileSystem();
+      }
+      return fs;
+    }
+
+    public FileSystem checkFileSystem() {
+      store.checkAndClearFileSystem();
+      return store.getCachedFileSystem();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplogOrganizer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplogOrganizer.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplogOrganizer.java
new file mode 100644
index 0000000..4f078d8
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplogOrganizer.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplog.HoplogDescriptor;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import org.apache.logging.log4j.Logger;
+
+
+public abstract class AbstractHoplogOrganizer<T extends PersistedEventImpl> implements HoplogOrganizer<T> {
+
+  public static final String MINOR_HOPLOG_EXTENSION = ".ihop";
+  public static final String MAJOR_HOPLOG_EXTENSION = ".chop";
+  public static final String EXPIRED_HOPLOG_EXTENSION = ".exp";
+  public static final String TEMP_HOPLOG_EXTENSION = ".tmp";
+
+  public static final String FLUSH_HOPLOG_EXTENSION = ".hop";
+  public static final String SEQ_HOPLOG_EXTENSION = ".shop";
+
+  // all valid hoplogs will follow the following name pattern
+  public static final String HOPLOG_NAME_REGEX = "(.+?)-(\\d+?)-(\\d+?)";
+  public static final Pattern HOPLOG_NAME_PATTERN = Pattern.compile(HOPLOG_NAME_REGEX
+      + "\\.(.*)");
+  
+  public static boolean JUNIT_TEST_RUN = false; 
+
+  protected static final boolean ENABLE_INTEGRITY_CHECKS = Boolean
+      .getBoolean("gemfire.HdfsSortedOplogOrganizer.ENABLE_INTEGRITY_CHECKS")
+      || assertionsEnabled();
+
+  private static boolean assertionsEnabled() {
+    boolean enabled = false;
+    assert enabled = true;
+    return enabled;
+  }
+
+  protected HdfsRegionManager regionManager;
+  // name or id of bucket managed by this organizer
+  protected final String regionFolder;
+  protected final int bucketId;
+
+  // path of the region directory
+  protected final Path basePath;
+  // identifies path of directory containing a bucket's oplog files
+  protected final Path bucketPath;
+
+  protected final HDFSStoreImpl store;
+
+  // assigns a unique increasing number to each oplog file
+  protected AtomicInteger sequence;
+
+  //logger instance
+  protected static final Logger logger = LogService.getLogger();
+  protected final String logPrefix;
+
+  protected SortedOplogStatistics stats;
+  AtomicLong bucketDiskUsage = new AtomicLong(0);
+
+  // creation of new files and expiration of files will be synchronously
+  // notified to the listener
+  protected HoplogListener listener;
+
+  private volatile boolean closed = false;
+  
+  protected Object changePrimarylockObject = new Object();
+  
+  public AbstractHoplogOrganizer(HdfsRegionManager region, int bucketId) {
+
+    assert region != null;
+
+    this.regionManager = region;
+    this.regionFolder = region.getRegionFolder();
+    this.store = region.getStore();
+    this.listener = region.getListener();
+    this.stats = region.getHdfsStats();
+    
+    this.bucketId = bucketId;
+
+    this.basePath = new Path(store.getHomeDir());
+    this.bucketPath = new Path(basePath, regionFolder + "/" + bucketId);
+
+    this.logPrefix = "<" + getRegionBucketStr() + "> ";
+    
+  }
+
+  @Override
+  public boolean isClosed() {
+    return closed || regionManager.isClosed();
+  }
+  
+  @Override
+  public void close() throws IOException {
+    closed = true;
+    
+    // this bucket is closed and may be owned by a new node. So reduce the store
+    // usage stat, as the new owner adds the usage metric
+    incrementDiskUsage((-1) * bucketDiskUsage.get());
+  }
+
+  @Override
+  public abstract void flush(Iterator<? extends QueuedPersistentEvent> bufferIter,
+      int count) throws IOException, ForceReattemptException;
+
+  @Override
+  public abstract void clear() throws IOException;
+
+  protected abstract Hoplog getHoplog(Path hoplogPath) throws IOException;
+
+  @Override
+  public void hoplogCreated(String region, int bucketId, Hoplog... oplogs)
+      throws IOException {
+    throw new UnsupportedOperationException("Not supported for "
+        + this.getClass().getSimpleName());
+  }
+
+  @Override
+  public void hoplogDeleted(String region, int bucketId, Hoplog... oplogs)
+      throws IOException {
+    throw new UnsupportedOperationException("Not supported for "
+        + this.getClass().getSimpleName());
+  }
+
+  @Override
+  public void compactionCompleted(String region, int bucket, boolean isMajor) {
+    throw new UnsupportedOperationException("Not supported for "
+        + this.getClass().getSimpleName());
+  }
+  
+  @Override
+  public T read(byte[] key) throws IOException {
+    throw new UnsupportedOperationException("Not supported for "
+        + this.getClass().getSimpleName());
+  }
+
+  @Override
+  public HoplogIterator<byte[], T> scan() throws IOException {
+    throw new UnsupportedOperationException("Not supported for "
+        + this.getClass().getSimpleName());
+  }
+
+  @Override
+  public HoplogIterator<byte[], T> scan(byte[] from, byte[] to)
+      throws IOException {
+    throw new UnsupportedOperationException("Not supported for "
+        + this.getClass().getSimpleName());
+  }
+
+  @Override
+  public HoplogIterator<byte[], T> scan(byte[] from,
+      boolean fromInclusive, byte[] to, boolean toInclusive) throws IOException {
+    throw new UnsupportedOperationException("Not supported for "
+        + this.getClass().getSimpleName());
+  }
+
+  @Override
+  public long sizeEstimate() {
+    throw new UnsupportedOperationException("Not supported for "
+        + this.getClass().getSimpleName());
+  }
+
+  /**
+   * @return returns an oplogs full path after prefixing bucket path to the file
+   *         name
+   */
+  protected String getPathStr(Hoplog oplog) {
+    return bucketPath.toString() + "/" + oplog.getFileName();
+  }
+
+  protected String getRegionBucketStr() {
+    return regionFolder + "/" + bucketId;
+  }
+
+  protected SortedHoplogPersistedEvent deserializeValue(byte[] val) throws IOException {
+    try {
+      return SortedHoplogPersistedEvent.fromBytes(val);
+    } catch (ClassNotFoundException e) {
+      logger
+          .error(
+              LocalizedStrings.GetMessage_UNABLE_TO_DESERIALIZE_VALUE_CLASSNOTFOUNDEXCEPTION,
+              e);
+      return null;
+    }
+  }
+
+  /**
+   * @return true if the entry belongs to an destroy event
+   */
+  protected boolean isDeletedEntry(byte[] value, int offset) throws IOException {
+    // Read only the first byte of PersistedEventImpl for the operation
+    assert value != null && value.length > 0 && offset >= 0 && offset < value.length;
+    Operation op = Operation.fromOrdinal(value[offset]);
+
+    if (op.isDestroy() || op.isInvalidate()) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * @param seqNum
+   *          desired sequence number of the hoplog. If null a highest number is
+   *          choosen
+   * @param extension
+   *          file extension representing the type of file, e.g. ihop for
+   *          intermediate hoplog
+   * @return a new temporary file for a new sorted oplog. The name consists of
+   *         bucket name, a sequence number for ordering the files followed by a
+   *         timestamp
+   */
+  Hoplog getTmpSortedOplog(Integer seqNum, String extension) throws IOException {
+    if (seqNum == null) {
+      seqNum = sequence.incrementAndGet();
+    }
+    String name = bucketId + "-" + System.currentTimeMillis() + "-" + seqNum 
+        + extension;
+    Path soplogPath = new Path(bucketPath, name + TEMP_HOPLOG_EXTENSION);
+    return getHoplog(soplogPath);
+  }
+  
+  /**
+   * renames a temporary hoplog file to a legitimate name.
+   */
+  static void makeLegitimate(Hoplog so) throws IOException {
+    String name = so.getFileName();
+    assert name.endsWith(TEMP_HOPLOG_EXTENSION);
+
+    int index = name.lastIndexOf(TEMP_HOPLOG_EXTENSION);
+    name = name.substring(0, index);
+    so.rename(name);
+  }
+
+  /**
+   * creates a expiry marker for a file on file system
+   * 
+   * @param hoplog
+   * @throws IOException
+   */
+  protected void addExpiryMarkerForAFile(Hoplog hoplog) throws IOException {
+    FileSystem fs = store.getFileSystem();
+
+    // TODO optimization needed here. instead of creating expired marker
+    // file per file, create a meta file. the main thing to worry is
+    // compaction of meta file itself
+    Path expiryMarker = getExpiryMarkerPath(hoplog.getFileName());
+
+    // uh-oh, why are we trying to expire an already expired file?
+    if (ENABLE_INTEGRITY_CHECKS) {
+      Assert.assertTrue(!fs.exists(expiryMarker),
+          "Expiry marker already exists: " + expiryMarker);
+    }
+
+    FSDataOutputStream expiryMarkerFile = fs.create(expiryMarker);
+    expiryMarkerFile.close();
+
+    if (logger.isDebugEnabled())
+      logger.debug("Hoplog marked expired: " + getPathStr(hoplog));
+  }
+
+  protected Path getExpiryMarkerPath(String name) {
+    return new Path(bucketPath, name + EXPIRED_HOPLOG_EXTENSION);
+  }
+  
+  protected String truncateExpiryExtension(String name) {
+    if (name.endsWith(EXPIRED_HOPLOG_EXTENSION)) {
+      return name.substring(0, name.length() - EXPIRED_HOPLOG_EXTENSION.length());
+    }
+    
+    return name;
+  }
+  
+  /**
+   * updates region stats and a local copy of bucket level store usage metric.
+   * 
+   * @param delta
+   */
+  protected void incrementDiskUsage(long delta) {
+    long newSize = bucketDiskUsage.addAndGet(delta);
+    if (newSize < 0 && delta < 0) {
+      if (logger.isDebugEnabled()){
+        logger.debug("{}Invalid diskUsage size:" + newSize + " caused by delta:"
+            + delta + ", parallel del & close?" + isClosed(), logPrefix);
+      }
+      if (isClosed()) {
+        // avoid corrupting disk usage size during close by reducing residue
+        // size only
+        delta = delta + (-1 * newSize);
+      }
+    }
+    stats.incStoreUsageBytes(delta);
+  }
+
+  /**
+   * Utility method to remove a file from valid file list if a expired marker
+   * for the file exists
+   * 
+   * @param valid
+   *          list of valid files
+   * @param expired
+   *          list of expired file markers
+   * @return list f valid files that do not have a expired file marker
+   */
+  public static FileStatus[] filterValidHoplogs(FileStatus[] valid,
+      FileStatus[] expired) {
+    if (valid == null) {
+      return null;
+    }
+
+    if (expired == null) {
+      return valid;
+    }
+
+    ArrayList<FileStatus> result = new ArrayList<FileStatus>();
+    for (FileStatus vs : valid) {
+      boolean found = false;
+      for (FileStatus ex : expired) {
+        if (ex
+            .getPath()
+            .getName()
+            .equals(
+                vs.getPath().getName()
+                    + HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION)) {
+          found = true;
+        }
+      }
+      if (!found) {
+        result.add(vs);
+      }
+    }
+
+    return result.toArray(new FileStatus[result.size()]);
+  }
+
+  protected void pingSecondaries() throws ForceReattemptException {
+
+    if (JUNIT_TEST_RUN)
+      return;
+    BucketRegion br = ((PartitionedRegion)this.regionManager.getRegion()).getDataStore().getLocalBucketById(this.bucketId);
+    boolean secondariesPingable = false;
+    try {
+      secondariesPingable = br.areSecondariesPingable();
+    } catch (Throwable e) {
+      throw new ForceReattemptException("Failed to ping secondary servers of bucket: " + 
+          this.bucketId + ", region: " + ((PartitionedRegion)this.regionManager.getRegion()), e);
+    }
+    if (!secondariesPingable)
+      throw new ForceReattemptException("Failed to ping secondary servers of bucket: " + 
+          this.bucketId + ", region: " + ((PartitionedRegion)this.regionManager.getRegion()));
+  }
+
+  
+
+  
+  /**
+   * A comparator for ordering soplogs based on the file name. The file names
+   * are assigned incrementally and hint at the age of the file
+   */
+  public static final class HoplogComparator implements
+      Comparator<TrackedReference<Hoplog>> {
+    /**
+     * a file with a higher sequence or timestamp is the younger and hence the
+     * smaller
+     */
+    @Override
+    public int compare(TrackedReference<Hoplog> o1, TrackedReference<Hoplog> o2) {
+      return o1.get().compareTo(o2.get());
+    }
+
+    /**
+     * Compares age of files based on file names and returns 1 if name1 is
+     * older, -1 if name1 is yonger and 0 if the two files are same age
+     */
+    public static int compareByName(String name1, String name2) {
+      HoplogDescriptor hd1 = new HoplogDescriptor(name1);
+      HoplogDescriptor hd2 = new HoplogDescriptor(name2);
+      
+      return hd1.compareTo(hd2);
+    }
+  }
+
+  /**
+   * @param matcher
+   *          A preinitialized / matched regex pattern
+   * @return Timestamp of the
+   */
+  public static long getHoplogTimestamp(Matcher matcher) {
+    return Long.valueOf(matcher.group(2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BloomFilter.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BloomFilter.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BloomFilter.java
new file mode 100644
index 0000000..86e66a1
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BloomFilter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+public interface BloomFilter {
+  /**
+   * Returns true if the bloom filter might contain the supplied key. The nature of the bloom filter
+   * is such that false positives are allowed, but false negatives cannot occur.
+   */
+  boolean mightContain(byte[] key);
+
+  /**
+   * Returns true if the bloom filter might contain the supplied key. The nature of the bloom filter
+   * is such that false positives are allowed, but false negatives cannot occur.
+   */
+  boolean mightContain(byte[] key, int keyOffset, int keyLength);
+
+  /**
+   * @return Size of the bloom, in bytes
+   */
+  long getBloomSize();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CloseTmpHoplogsTimerTask.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CloseTmpHoplogsTimerTask.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CloseTmpHoplogsTimerTask.java
new file mode 100644
index 0000000..3f67de8
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CloseTmpHoplogsTimerTask.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.util.Collection;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
+import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+import org.apache.logging.log4j.Logger;
+
+/**
+ * For streaming case, if the bucket traffic goes down after writing few batches of data, 
+ * the flush doesn't get called. In that case, the file is left in tmp state
+ * until the flush restarts. To avoid this issue, added this timer task 
+ * that periodically iterates over the buckets and closes their writer 
+ * if the time for rollover has passed.
+ * 
+ * It also has got an extra responsibility of fixing the file sizes of the files 
+ * that weren't closed properly last time. 
+ *
+ *
+ */
+class CloseTmpHoplogsTimerTask extends SystemTimerTask {
+  
+  private HdfsRegionManager hdfsRegionManager;
+  private static final Logger logger = LogService.getLogger();
+  private FileSystem filesystem; 
+  
+  public CloseTmpHoplogsTimerTask(HdfsRegionManager hdfsRegionManager) {
+    this.hdfsRegionManager = hdfsRegionManager;
+    
+    // Create a new filesystem 
+    // This is added for the following reason:
+    // For HDFS, if a file wasn't closed properly last time, 
+    // while calling FileSystem.append for this file, FSNamesystem.startFileInternal->
+    // FSNamesystem.recoverLeaseInternal function gets called. 
+    // This function throws AlreadyBeingCreatedException if there is an open handle, to any other file, 
+    // created using the same FileSystem object. This is a bug and is being tracked at: 
+    // https://issues.apache.org/jira/browse/HDFS-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
+    // 
+    // The fix for this bug is not yet part of Pivotal HD. So to overcome the bug, 
+    // we create a new file system for the timer task so that it does not encounter the bug. 
+    this.filesystem = this.hdfsRegionManager.getStore().createFileSystem();
+    if (logger.isDebugEnabled()) 
+      logger.debug("created a new file system specifically for timer task");
+  }
+
+  
+  /**
+   * Iterates over all the bucket organizers and closes their writer if the time for 
+   * rollover has passed. It also has the additional responsibility of fixing the tmp
+   * files that were left over in the last unsuccessful run. 
+   */
+  @Override
+  public void run2() {
+    Collection<HoplogOrganizer> organizers =  hdfsRegionManager.getBucketOrganizers();
+    if (logger.isDebugEnabled()) 
+      logger.debug("Starting the close temp logs run.");
+    
+    for (HoplogOrganizer organizer: organizers) {
+      
+      HDFSUnsortedHoplogOrganizer unsortedOrganizer = (HDFSUnsortedHoplogOrganizer)organizer;
+      long timeSinceLastFlush = (System.currentTimeMillis() - unsortedOrganizer.getLastFlushTime())/1000 ;
+      try {
+        this.hdfsRegionManager.getRegion().checkReadiness();
+      } catch (Exception e) {
+        break;
+      }
+      
+      try {
+        // the time since last flush has exceeded file rollover interval, roll over the 
+        // file. 
+        if (timeSinceLastFlush >= unsortedOrganizer.getfileRolloverInterval()) {
+          if (logger.isDebugEnabled()) 
+            logger.debug("Closing writer for bucket: " + unsortedOrganizer.bucketId);
+          unsortedOrganizer.synchronizedCloseWriter(false, timeSinceLastFlush, 0);
+        }
+        
+        // fix the tmp hoplogs, if any. Pass the new file system here. 
+        unsortedOrganizer.identifyAndFixTmpHoplogs(this.filesystem);
+        
+      } catch (Exception e) {
+        logger.warn(LocalizedStrings.HOPLOG_CLOSE_FAILED, e);
+      }
+    }
+    
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CompactionStatus.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CompactionStatus.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CompactionStatus.java
new file mode 100644
index 0000000..55d8f87
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CompactionStatus.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.internal.VersionedDataSerializable;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * Status of the compaction task reported in the future
+ * 
+ */
+public class CompactionStatus implements VersionedDataSerializable {
+  /**MergeGemXDHDFSToGFE check and verify serializationversions **/
+ 
+  private static Version[] serializationVersions = new Version[]{ Version.GFE_81 };
+  private int bucketId;
+  private boolean status;
+
+  public CompactionStatus() {
+  }
+
+  public CompactionStatus(int bucketId, boolean status) {
+    this.bucketId = bucketId;
+    this.status = status;
+  }
+  public int getBucketId() {
+    return bucketId;
+  }
+  public boolean isStatus() {
+    return status;
+  }
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    out.writeInt(bucketId);
+    out.writeBoolean(status);
+  }
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    this.bucketId = in.readInt();
+    this.status = in.readBoolean();
+  }
+  @Override
+  public Version[] getSerializationVersions() {
+    return serializationVersions;
+  }
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(getClass().getCanonicalName()).append("@")
+    .append(System.identityHashCode(this)).append(" Bucket:")
+    .append(bucketId).append(" status:").append(status);
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/FlushStatus.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/FlushStatus.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/FlushStatus.java
new file mode 100644
index 0000000..84beded
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/FlushStatus.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.internal.VersionedDataSerializable;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * Reports the result of a flush request.
+ * 
+ */
+public class FlushStatus implements VersionedDataSerializable {
+  private static Version[] serializationVersions = new Version[]{ Version.GFE_81 };
+  private int bucketId;
+
+  private final static int LAST = -1;
+  
+  public FlushStatus() {
+  }
+
+  public static FlushStatus last() {
+    return new FlushStatus(LAST);
+  }
+  
+  public FlushStatus(int bucketId) {
+    this.bucketId = bucketId;
+  }
+  public int getBucketId() {
+    return bucketId;
+  }
+  public boolean isLast() {
+    return bucketId == LAST;
+  }
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    out.writeInt(bucketId);
+  }
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    this.bucketId = in.readInt();
+  }
+  @Override
+  public Version[] getSerializationVersions() {
+    return serializationVersions;
+  }
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(getClass().getCanonicalName()).append("@")
+    .append(System.identityHashCode(this)).append(" Bucket:")
+    .append(bucketId);
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java
new file mode 100644
index 0000000..ba191c2
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer.Compactor;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+/**
+ * A singleton which schedules compaction of hoplogs owned by this node as primary and manages
+ * executor of ongoing compactions. Ideally the number of pending request will not exceed the number
+ * of buckets in the node as hoplog organizer avoids creating a new request if compaction on the
+ * bucket is active. Moreover separate queues for major and minor compactions are maintained to
+ * prevent long running major compactions from preventing minor compactions.
+ */
+public class HDFSCompactionManager {
+  /*
+   * Each hdfs store has its own concurrency configuration. Concurrency
+   * configuration is used by compaction manager to manage threads. This member
+   * holds hdsf-store to compaction manager mapping
+   */
+  private static final ConcurrentHashMap<String, HDFSCompactionManager> storeToManagerMap = 
+                                        new ConcurrentHashMap<String, HDFSCompactionManager>();
+
+  // hdfs store configuration used to initialize this instance
+  HDFSStore storeConfig;
+  
+  // Executor for ordered execution of minor compaction requests.
+  private final CompactionExecutor minorCompactor;
+  // Executor for ordered execution of major compaction requests.
+  private final CompactionExecutor majorCompactor;
+
+  private static final Logger logger = LogService.getLogger();
+  protected final static String logPrefix =  "<" + "HDFSCompactionManager" + "> ";;
+  
+  private HDFSCompactionManager(HDFSStore config) {
+    this.storeConfig = config;
+    // configure hdfs compaction manager
+    int capacity = Integer.getInteger(HoplogConfig.COMPCATION_QUEUE_CAPACITY,
+        HoplogConfig.COMPCATION_QUEUE_CAPACITY_DEFAULT);
+
+    minorCompactor = new CompactionExecutor(config.getMinorCompactionThreads(), capacity, "MinorCompactor_"
+        + config.getName());
+
+    majorCompactor = new CompactionExecutor(config.getMajorCompactionThreads(), capacity, "MajorCompactor_"
+        + config.getName());
+
+    minorCompactor.allowCoreThreadTimeOut(true);
+    majorCompactor.allowCoreThreadTimeOut(true);
+  }
+
+  public static synchronized HDFSCompactionManager getInstance(HDFSStore config) {
+    HDFSCompactionManager instance = storeToManagerMap.get(config.getName());
+    if (instance == null) {
+      instance = new HDFSCompactionManager(config);
+      storeToManagerMap.put(config.getName(), instance);
+    }
+    
+    return instance;
+  }
+
+  /**
+   * Accepts compaction request for asynchronous compaction execution.
+   * 
+   * @param request
+   *          compaction request with region and bucket id
+   * @return true if the request is accepted, false if the compactor is overlaoded and there is a
+   *         long wait queue
+   */
+  public synchronized Future<CompactionStatus> submitRequest(CompactionRequest request) {
+    if (!request.isForced && request.compactor.isBusy(request.isMajor)) {
+      if (logger.isDebugEnabled()) {
+        fineLog("Compactor is busy. Ignoring ", request);
+      }
+      return null;
+    }
+    
+    CompactionExecutor executor = request.isMajor ? majorCompactor : minorCompactor;
+    
+    try {
+      return executor.submit(request);
+    } catch (Throwable e) {
+      if (e instanceof CompactionIsDisabled) {
+        if (logger.isDebugEnabled()) {
+          fineLog("{}" +e.getMessage(), logPrefix);
+        }
+      } else {
+        logger.info(LocalizedMessage.create(LocalizedStrings.ONE_ARG, "Compaction request submission failed"), e);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Removes all pending compaction requests. Programmed for TESTING ONLY
+   */
+  public void reset() {
+    minorCompactor.shutdownNow();
+    majorCompactor.shutdownNow();
+    HDFSCompactionManager.storeToManagerMap.remove(storeConfig.getName());
+  }
+  
+  /**
+   * Returns minor compactor. Programmed for TESTING AND MONITORING ONLY  
+   */
+  public ThreadPoolExecutor getMinorCompactor() {
+    return minorCompactor;
+  }
+
+  /**
+   * Returns major compactor. Programmed for TESTING AND MONITORING ONLY  
+   */
+  public ThreadPoolExecutor getMajorCompactor() {
+    return majorCompactor;
+  }
+  
+  /**
+   * Contains important details needed for executing a compaction cycle.
+   */
+  public static class CompactionRequest implements Callable<CompactionStatus> {
+    String regionFolder;
+    int bucket;
+    Compactor compactor;
+    boolean isMajor;
+    final boolean isForced;
+    final boolean versionUpgrade;
+
+    public CompactionRequest(String regionFolder, int bucket, Compactor compactor, boolean major) {
+      this(regionFolder, bucket, compactor, major, false);
+    }
+
+    public CompactionRequest(String regionFolder, int bucket, Compactor compactor, boolean major, boolean isForced) {
+      this(regionFolder, bucket, compactor, major, isForced, false);
+    }
+
+    public CompactionRequest(String regionFolder, int bucket, Compactor compactor, boolean major, boolean isForced, boolean versionUpgrade) {
+      this.regionFolder = regionFolder;
+      this.bucket = bucket;
+      this.compactor = compactor;
+      this.isMajor = major;
+      this.isForced = isForced;
+      this.versionUpgrade = versionUpgrade;
+    }
+
+    @Override
+    public CompactionStatus call() throws Exception {
+      HDFSStore store = compactor.getHdfsStore();
+      if (!isForced) {
+        // this is a auto generated compaction request. If auto compaction is
+        // disabled, ignore this call.
+        if (isMajor && !store.getMajorCompaction()) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("{}Major compaction is disabled. Ignoring request",logPrefix);
+          }
+          return new CompactionStatus(bucket, false);
+        } else if (!isMajor && !store.getMinorCompaction()) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("{}Minor compaction is disabled. Ignoring request", logPrefix);
+          }
+          return new CompactionStatus(bucket, false);
+        }
+      }
+
+      // all hurdles passed, execute compaction now
+      try {
+        boolean status = compactor.compact(isMajor, versionUpgrade);
+        return new CompactionStatus(bucket, status);
+      } catch (IOException e) {
+        logger.error(LocalizedMessage.create(LocalizedStrings.HOPLOG_HDFS_COMPACTION_ERROR, bucket), e);
+      }
+      return new CompactionStatus(bucket, false);
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + bucket;
+      result = prime * result
+          + ((regionFolder == null) ? 0 : regionFolder.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      CompactionRequest other = (CompactionRequest) obj;
+      if (bucket != other.bucket)
+        return false;
+      if (regionFolder == null) {
+        if (other.regionFolder != null)
+          return false;
+      } else if (!regionFolder.equals(other.regionFolder))
+        return false;
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      return "CompactionRequest [regionFolder=" + regionFolder + ", bucket="
+          + bucket + ", isMajor=" + isMajor + ", isForced="+isForced+"]";
+    }
+  }
+
+  /**
+   * Helper class for creating named instances of comapction threads and managing compaction
+   * executor. All threads wait infinitely
+   */
+  private class CompactionExecutor extends ThreadPoolExecutor implements ThreadFactory {
+    final AtomicInteger count = new AtomicInteger(1);
+    private String name;
+
+    CompactionExecutor(int max, int capacity, String name) {
+      super(max, max, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(capacity));
+      allowCoreThreadTimeOut(true);
+      setThreadFactory(this);
+      this.name = name;
+    }
+    
+    private void throwIfStopped(CompactionRequest req, HDFSStore storeConfig) {
+      // check if compaction is enabled everytime. Alter may change this value
+      // so this check is needed everytime
+      boolean isEnabled = true;
+      isEnabled = storeConfig.getMinorCompaction();
+      if (req.isMajor) {
+        isEnabled = storeConfig.getMajorCompaction();
+      }
+      if (isEnabled || req.isForced) {
+        return;
+      }
+      throw new CompactionIsDisabled(name + " is disabled");
+    }
+
+    private void throwIfPoolSizeChanged(CompactionRequest task, HDFSStore config) {
+      int threadCount = config.getMinorCompactionThreads();
+      if (task.isMajor) {
+        threadCount = config.getMajorCompactionThreads();
+      }
+      
+      if (getCorePoolSize() < threadCount) {
+        setCorePoolSize(threadCount);
+      } else if (getCorePoolSize() > threadCount) {
+        setCorePoolSize(threadCount);
+      }
+      
+      if (!task.isForced && getActiveCount() > threadCount) {
+        // the number is active threads is more than new max pool size. Throw
+        // error is this is system generated compaction request
+        throw new CompactionIsDisabled(
+            "Rejecting to reduce the number of threads for " + name
+            + ", currently:" + getActiveCount() + " target:"
+            + threadCount);
+      }
+    }
+    
+    @Override
+    public <T> Future<T> submit(Callable<T> task) {
+      throwIfStopped((CompactionRequest) task, HDFSCompactionManager.this.storeConfig);
+      throwIfPoolSizeChanged((CompactionRequest) task, HDFSCompactionManager.this.storeConfig);
+      
+      if (logger.isDebugEnabled()) {
+        fineLog("New:", task, " pool:", getPoolSize(), " active:", getActiveCount());
+      }
+      return super.submit(task);
+    }
+
+    @Override
+    public Thread newThread(Runnable r) {
+      Thread thread = new Thread(r, name + ":" + count.getAndIncrement());
+      thread.setDaemon(true);
+      if (logger.isDebugEnabled()) {
+        fineLog("New thread:", name, " poolSize:", getPoolSize(),
+            " active:", getActiveCount());
+      }
+      return thread;
+    }
+  }
+  
+  public static class CompactionIsDisabled extends RejectedExecutionException {
+    private static final long serialVersionUID = 1L;
+    public CompactionIsDisabled(String name) {
+      super(name);
+    }
+  }
+  
+  
+  private void fineLog(Object... strings) {
+    if (logger.isDebugEnabled()) {
+      StringBuffer sb = new StringBuffer();
+      for (Object str : strings) {
+        sb.append(str.toString());
+      }
+      logger.debug("{}"+sb.toString(), logPrefix);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueArgs.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueArgs.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueArgs.java
new file mode 100644
index 0000000..36e171b
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueArgs.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.internal.VersionedDataSerializable;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * Defines the arguments to the flush queue request.
+ * 
+ */
+@SuppressWarnings("serial")
+public class HDFSFlushQueueArgs implements VersionedDataSerializable {
+
+  private static Version[] serializationVersions = new Version[]{ Version.GFE_81 };
+
+  private HashSet<Integer> buckets;
+
+  private long maxWaitTimeMillis;
+
+  public HDFSFlushQueueArgs() {
+  }
+
+  public HDFSFlushQueueArgs(Set<Integer> buckets, long maxWaitTime) {
+    this.buckets = new HashSet<Integer>(buckets);
+    this.maxWaitTimeMillis = maxWaitTime;
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    DataSerializer.writeHashSet(buckets, out);
+    out.writeLong(maxWaitTimeMillis);
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException,
+      ClassNotFoundException {
+    this.buckets = DataSerializer.readHashSet(in);
+    this.maxWaitTimeMillis = in.readLong();
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return serializationVersions;
+  }
+
+  public Set<Integer> getBuckets() {
+    return (Set<Integer>) buckets;
+  }
+
+  public void setBuckets(Set<Integer> buckets) {
+    this.buckets = new HashSet<Integer>(buckets);
+  }
+
+  public boolean isSynchronous() {
+    return maxWaitTimeMillis == 0;
+  }
+
+  public long getMaxWaitTime() {
+    return this.maxWaitTimeMillis;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(getClass().getCanonicalName()).append("@")
+    .append(System.identityHashCode(this))
+    .append(" buckets:").append(buckets)
+    .append(" maxWaitTime:").append(maxWaitTimeMillis);
+    return sb.toString();
+  }
+}