You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2016/04/27 23:16:49 UTC

[17/22] incubator-geode git commit: GEODE-1072: Removing HDFS related code

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java
deleted file mode 100644
index 82e2bf9..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.cache.Operation;
-import com.gemstone.gemfire.internal.DataSerializableFixedID;
-import com.gemstone.gemfire.internal.cache.CachedDeserializable;
-import com.gemstone.gemfire.internal.cache.CachedDeserializableFactory;
-import com.gemstone.gemfire.internal.cache.EntryEventImpl;
-import com.gemstone.gemfire.internal.cache.lru.Sizeable;
-import com.gemstone.gemfire.internal.cache.versions.VersionTag;
-import com.gemstone.gemfire.internal.Version;
-
-/**
- * Event that is persisted in HDFS. As we need to persist some of the EntryEventImpl
- * variables, we have created this class and have overridden toData and fromData functions.  
- * 
- *  There are subclasses of this class of the different types of persisted events
- *  sorted vs. unsorted, and the persisted events we keep in the region
- *  queue, which need to hold the region key.
- *   
- *
- */
-public abstract class PersistedEventImpl {
-  protected Operation op = Operation.UPDATE;
-  
-  protected Object valueObject;
-
-  /**
-   * A field with flags decribing the event
-   */
-  protected byte flags;
-
-   //FLags indicating the type of value
-   //if the value is not a byte array or object, is is an internal delta.
-  private static final byte VALUE_IS_BYTE_ARRAY= 0x01;
-  private static final byte VALUE_IS_OBJECT= (VALUE_IS_BYTE_ARRAY << 1);
-  private static final byte POSSIBLE_DUPLICATE = (VALUE_IS_OBJECT << 1);
-  private static final byte HAS_VERSION_TAG = (POSSIBLE_DUPLICATE << 1);
-  
-
-  /** for deserialization */
-  public PersistedEventImpl() {
-  }
-  
-  public PersistedEventImpl(Object value, Operation op, byte valueIsObject,
-      boolean isPossibleDuplicate, boolean hasVersionTag) throws IOException,
-      ClassNotFoundException {
-    this.op = op;
-    this.valueObject = value;
-    setFlag(VALUE_IS_BYTE_ARRAY, valueIsObject == 0x00);
-    setFlag(VALUE_IS_OBJECT, valueIsObject == 0x01);
-    setFlag(POSSIBLE_DUPLICATE, isPossibleDuplicate);
-    setFlag(HAS_VERSION_TAG, hasVersionTag);
-  }
-  
-  private void setFlag(byte flag, boolean set) {
-    flags = (byte) (set ?  flags | flag :  flags & ~flag);
-  }
-  
-  private boolean getFlag(byte flag) {
-    return (flags & flag) != 0x0;
-  }
-
-  public void toData(DataOutput out) throws IOException {
-    out.writeByte(this.op.ordinal);
-    out.writeByte(this.flags);
-    
-    if (getFlag(VALUE_IS_BYTE_ARRAY)) { 
-      DataSerializer.writeByteArray((byte[])this.valueObject, out);
-    } else if (getFlag(VALUE_IS_OBJECT)) {
-      if(valueObject instanceof CachedDeserializable) {
-        CachedDeserializable cd = (CachedDeserializable)valueObject;
-        DataSerializer.writeObjectAsByteArray(cd.getValue(), out);
-      } else {
-        DataSerializer.writeObjectAsByteArray(valueObject, out);
-      }
-    }
-    else {
-      DataSerializer.writeObject(valueObject, out);
-    }
-  }
-
-  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-    this.op = Operation.fromOrdinal(in.readByte());
-    this.flags = in.readByte();
-    
-    if (getFlag(VALUE_IS_BYTE_ARRAY)) { 
-      this.valueObject = DataSerializer.readByteArray(in);
-    } else if (getFlag(VALUE_IS_OBJECT)) {
-      byte[] newValueBytes = DataSerializer.readByteArray(in);
-      if(newValueBytes == null) {
-        this.valueObject = null;
-      } else {
-        if(CachedDeserializableFactory.preferObject()) {
-          this.valueObject =  EntryEventImpl.deserialize(newValueBytes);
-        } else {
-          this.valueObject = CachedDeserializableFactory.create(newValueBytes);
-        }
-      }
-    }
-    else {
-      this.valueObject = DataSerializer.readObject(in);
-    }
-    
-  }
-  
-  /**
-   * Return the timestamp of this event. Depending on the subclass,
-   * this may be part of the version tag, or a separate field.
-   */
-  public abstract long getTimstamp();
-
-  protected boolean hasVersionTag() {
-    return getFlag(HAS_VERSION_TAG);
-  }
-
-  public Operation getOperation()
-  {
-    return this.op;
-  }
-  
-  public Object getValue() {
-    return this.valueObject;
-  }
-  
-  public boolean isPossibleDuplicate()
-  {
-    return getFlag(POSSIBLE_DUPLICATE);
-  }
-
-  /**
-   * returns deserialized value. 
-   * 
-   */
-  public Object getDeserializedValue() throws IOException, ClassNotFoundException {
-    Object retVal = null;
-    if (getFlag(VALUE_IS_BYTE_ARRAY)) { 
-      // value is a byte array
-      retVal = this.valueObject;
-    } else if (getFlag(VALUE_IS_OBJECT)) {
-      if(valueObject instanceof CachedDeserializable) {
-        retVal = ((CachedDeserializable)valueObject).getDeserializedForReading();
-      } else {
-        retVal = valueObject;
-      }
-    }
-    else {
-      // value is a object
-      retVal = this.valueObject;
-    }
-    return retVal;
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder str = new StringBuilder(PersistedEventImpl.class.getSimpleName());
-    str.append("@").append(System.identityHashCode(this))
-    .append(" op:").append(op)
-    .append(" valueObject:").append(valueObject)
-    .append(" isPossibleDuplicate:").append(getFlag(POSSIBLE_DUPLICATE));
-    return str.toString();
-  }
-
-  public void copy(PersistedEventImpl usersValue) {
-    this.op = usersValue.op;
-    this.valueObject = usersValue.valueObject;
-    this.flags = usersValue.flags;
-  }
-  
-  public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
-    int size = 0;
-    
-    // value length
-    size += valueSize; 
-
-    // one byte for op and one byte for flag
-    size += 2;
-    
-    return size;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java
deleted file mode 100644
index bd7994c..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-public interface QueuedPersistentEvent {
-  
-  public byte[] getRawKey();
-  
-  public void toHoplogEventBytes(DataOutput out) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java
deleted file mode 100644
index b97bdb7..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Tracks flushes using a queue of latches.
- * 
- */
-public class SignalledFlushObserver implements FlushObserver {
-  private static class FlushLatch extends CountDownLatch {
-    private final long seqnum;
-    
-    public FlushLatch(long seqnum) {
-      super(1);
-      this.seqnum = seqnum;
-    }
-    
-    public long getSequence() {
-      return seqnum;
-    }
-  }
-  
-  // assume the number of outstanding flush requests is small so we don't
-  // need to organize by seqnum
-  private final List<FlushLatch> signals;
-  
-  private final AtomicLong eventsReceived;
-  private final AtomicLong eventsDelivered;
-  
-  public SignalledFlushObserver() {
-    signals = new ArrayList<FlushLatch>();
-    eventsReceived = new AtomicLong(0);
-    eventsDelivered = new AtomicLong(0);
-  }
-  
-  @Override
-  public boolean shouldDrainImmediately() {
-    synchronized (signals) {
-      return !signals.isEmpty();
-    }
-  }
-  
-  @Override
-  public AsyncFlushResult flush() {
-    final long seqnum = eventsReceived.get();
-    synchronized (signals) {
-      final FlushLatch flush;
-      if (seqnum <= eventsDelivered.get()) {
-        flush = null;
-      } else {
-        flush = new FlushLatch(seqnum);
-        signals.add(flush);
-      }
-      
-      return new AsyncFlushResult() {
-        @Override
-        public boolean waitForFlush(long timeout, TimeUnit unit) throws InterruptedException {
-          return flush == null ? true : flush.await(timeout, unit);
-        }
-      };
-    }
-  }
-
-  /**
-   * Invoked when an event is received.
-   */
-  public void push() {
-    eventsReceived.incrementAndGet();
-  }
-
-  /**
-   * Invoked when a batch has been dispatched.
-   */
-  public void pop(int count) {
-    long highmark = eventsDelivered.addAndGet(count);
-    synchronized (signals) {
-      for (ListIterator<FlushLatch> iter = signals.listIterator(); iter.hasNext(); ) {
-        FlushLatch flush = iter.next();
-        if (flush.getSequence() <= highmark) {
-          flush.countDown();
-          iter.remove();
-        }
-      }
-    }
-  }
-  
-  /**
-   * Invoked when the queue is cleared.
-   */
-  public void clear() {
-    synchronized (signals) {
-      for (FlushLatch flush : signals) {
-        flush.countDown();
-      }
-
-      signals.clear();
-      eventsReceived.set(0);
-      eventsDelivered.set(0);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java
deleted file mode 100644
index c725ce5..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.cache.Operation;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.internal.DataSerializableFixedID;
-import com.gemstone.gemfire.internal.cache.versions.VersionTag;
-import com.gemstone.gemfire.internal.Version;
-
-/**
- * A persistent event that is stored in the hoplog queue. This class is only used
- * temporarily to copy the data from the HDFSGatewayEventImpl to the persisted
- * record in the file.
- * 
- *
- */
-public class SortedHDFSQueuePersistedEvent extends SortedHoplogPersistedEvent implements QueuedPersistentEvent {
-  
-  
-  /**key stored in serialized form*/
-  protected byte[] keyBytes = null;
-  
-  public SortedHDFSQueuePersistedEvent(HDFSGatewayEventImpl in) throws IOException,
-  ClassNotFoundException {
-    this(in.getSerializedValue(), in.getOperation(), in.getValueIsObject(), in
-        .getPossibleDuplicate(), in.getVersionTag(), in.getSerializedKey(), in
-        .getCreationTime());
-  }
-
-  public SortedHDFSQueuePersistedEvent(Object valueObject, Operation operation,
-      byte valueIsObject, boolean possibleDuplicate, VersionTag versionTag,
-      byte[] serializedKey, long timestamp) throws ClassNotFoundException, IOException {
-    super(valueObject, operation, valueIsObject, possibleDuplicate, versionTag, timestamp);
-    this.keyBytes = serializedKey;
-    // TODO Auto-generated constructor stub
-  }
-
-  @Override
-  public void toData(DataOutput out) throws IOException {
-    super.toData(out);
-    DataSerializer.writeByteArray(this.keyBytes, out);
-  }
-
-  @Override
-  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-    super.fromData(in);
-    this.keyBytes = DataSerializer.readByteArray(in);
-  }
-
-  @Override
-  public void toHoplogEventBytes(DataOutput out) throws IOException {
-    super.toData(out);
-  }
-
-  public byte[] getRawKey() {
-    return this.keyBytes;
-  }
-  public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
-    
-    int size = SortedHoplogPersistedEvent.getSizeInBytes(keySize, valueSize, versionTag);
-    
-    size += keySize;
-    
-    return size;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java
deleted file mode 100644
index e8be7b8..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.cache.Operation;
-import com.gemstone.gemfire.internal.ByteArrayDataInput;
-import com.gemstone.gemfire.internal.cache.versions.VersionTag;
-
-/**
- * A persistent event that is stored in a sorted hoplog. In addition
- * to the fields of PersistentEventImpl, this event has a version tag.
- * 
- * This class should only be serialized by directly calling toData,
- * which is why it does not implement DataSerializable
- * 
- */
-public class SortedHoplogPersistedEvent extends PersistedEventImpl {
-  /** version tag for concurrency checks */
-  protected VersionTag versionTag;
-
-  /** timestamp of the event. Used when version checks are disabled*/
-  protected long timestamp;
-
-  public SortedHoplogPersistedEvent(Object valueObject, Operation operation,
-      byte valueIsObject, boolean possibleDuplicate, VersionTag tag, long timestamp) throws ClassNotFoundException, IOException {
-    super(valueObject, operation, valueIsObject, possibleDuplicate, tag != null);
-    this.versionTag = tag;
-    this.timestamp = timestamp;
-  }
-
-  public SortedHoplogPersistedEvent() {
-    //for deserialization
-  }
-
-  @Override
-  public long getTimstamp() {
-    return versionTag == null ? timestamp : versionTag.getVersionTimeStamp();
-  }
-  
-  @Override
-  public void toData(DataOutput out) throws IOException {
-    super.toData(out);
-    if (versionTag == null) {
-      out.writeLong(timestamp);
-    } else {
-      //TODO optimize these
-      DataSerializer.writeObject(this.versionTag, out);
-    }
-  }
-
-  @Override
-  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-    super.fromData(in);
-    if (hasVersionTag()) {
-      this.versionTag = (VersionTag)DataSerializer.readObject(in);
-    } else {
-      this.timestamp = in.readLong();
-    }
-  }
-  
-  /**
-   * @return the concurrency versioning tag for this event, if any
-   */
-  public VersionTag getVersionTag() {
-    return this.versionTag;
-  }
-  
-  public static SortedHoplogPersistedEvent fromBytes(byte[] val)
-      throws IOException, ClassNotFoundException {
-    ByteArrayDataInput in = new ByteArrayDataInput();
-    in.initialize(val, null);
-    SortedHoplogPersistedEvent event = new SortedHoplogPersistedEvent();
-    event.fromData(in);
-    return event;
-  }
-  
-  public void copy(PersistedEventImpl usersValue) {
-    super.copy(usersValue);
-    this.versionTag = ((SortedHoplogPersistedEvent) usersValue).versionTag;
-    this.timestamp = ((SortedHoplogPersistedEvent) usersValue).timestamp;
-  }
-  
-  public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
-    int size = PersistedEventImpl.getSizeInBytes(keySize, valueSize, versionTag);
-    
-    if (versionTag != null) {
-      size +=  versionTag.getSizeInBytes();
-    } else {
-      // size of Timestamp
-      size += 8;
-    }
-    
-    return size;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java
deleted file mode 100644
index 93d596b..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.internal.cache.versions.VersionTag;
-
-
-/**
- * A persistent event that is stored in the hoplog queue. This class is only used
- * temporarily to copy the data from the HDFSGatewayEventImpl to the persisted
- * record in the file. 
- * 
- *
- */
-public class UnsortedHDFSQueuePersistedEvent extends UnsortedHoplogPersistedEvent implements QueuedPersistentEvent {
-  
-  /**the bytes of the key for this entry */
-  protected byte[] keyBytes = null;
-  
-  public UnsortedHDFSQueuePersistedEvent(HDFSGatewayEventImpl in) throws IOException,
-  ClassNotFoundException {
-    super(in.getValue(), in.getOperation(), in.getValueIsObject(), in.getPossibleDuplicate(), 
-        in.getVersionTimeStamp() == 0 ? in.getCreationTime() : in.getVersionTimeStamp());
-    this.keyBytes = in.getSerializedKey();
-    
-  }
-
-  @Override
-  public void toData(DataOutput out) throws IOException {
-    super.toData(out);
-    DataSerializer.writeByteArray(this.keyBytes, out);
-  }
-
-  @Override
-  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-    super.fromData(in);
-    this.keyBytes = DataSerializer.readByteArray(in);
-  }
-  
-  @Override
-  public void toHoplogEventBytes(DataOutput out) throws IOException {
-    super.toData(out);
-  }
-  
-  public byte[] getRawKey() {
-    return this.keyBytes;
-  }
-  
-  public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
-    
-    int size = UnsortedHoplogPersistedEvent.getSizeInBytes(keySize, valueSize, versionTag);
-    
-    size += keySize;
-    
-    return size;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java
deleted file mode 100644
index 9b9a04d..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.cache.Operation;
-import com.gemstone.gemfire.internal.ByteArrayDataInput;
-import com.gemstone.gemfire.internal.cache.versions.VersionTag;
-
-/**
- * A persisted event that is sorted in an unsorted (sequential hoplog). This
- * does not have a version stamp, but just a timestamp for the entry.
- * 
- * This class should only be serialized by calling toData directly, which
- * is why it does not implement DataSerializable.
- * 
- *
- */
-public class UnsortedHoplogPersistedEvent extends PersistedEventImpl {
-  long timestamp;
-  
-  
-
-  public UnsortedHoplogPersistedEvent() {
-    //for deserialization
-  }
-
-  public UnsortedHoplogPersistedEvent(Object value, Operation op,
-      byte valueIsObject, boolean isPossibleDuplicate, long timestamp) throws IOException,
-      ClassNotFoundException {
-    super(value, op, valueIsObject, isPossibleDuplicate, false/*hasVersionTag*/);
-    this.timestamp = timestamp;
-  }
-
-  @Override
-  public long getTimstamp() {
-    return timestamp;
-  }
-
-  @Override
-  public void toData(DataOutput out) throws IOException {
-    super.toData(out);
-    DataSerializer.writeLong(timestamp, out);
-  }
-
-  @Override
-  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-    super.fromData(in);
-    this.timestamp = DataSerializer.readLong(in);
-  }
-  
-  public static UnsortedHoplogPersistedEvent fromBytes(byte[] val)
-      throws IOException, ClassNotFoundException {
-    ByteArrayDataInput in = new ByteArrayDataInput();
-    in.initialize(val, null);
-    UnsortedHoplogPersistedEvent event = new UnsortedHoplogPersistedEvent();
-    event.fromData(in);
-    return event;
-  }
-  
-  public void copy(PersistedEventImpl usersValue) {
-    super.copy(usersValue);
-    this.timestamp = ((UnsortedHoplogPersistedEvent) usersValue).timestamp;
-  }
-  
-  public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
-    int size = PersistedEventImpl.getSizeInBytes(keySize, valueSize, versionTag);
-    
-    // size of Timestamp
-    size += 8;
-    
-    return size;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
deleted file mode 100644
index d2fdbe7..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.IOException;
-import java.util.regex.Matcher;
-
-import com.gemstone.gemfire.internal.hll.ICardinality;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.io.compress.Lz4Codec;
-import org.apache.hadoop.io.compress.SnappyCodec;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile;
-import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.CompressionType;
-import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.Version;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-import org.apache.hadoop.hbase.util.FSUtils;
-
-import org.apache.logging.log4j.Logger;
-
-/**
- * Abstract class for {@link Hoplog} with common functionality
- */
-public abstract class AbstractHoplog implements Hoplog {
-  protected final FSProvider fsProvider;
-  
-  // path of the oplog file
-  protected volatile Path path;
-  private volatile HoplogDescriptor hfd;
-  protected Configuration conf;
-  protected SortedOplogStatistics stats;
-  protected Long hoplogModificationTime;
-  protected Long hoplogSize;
-
-  protected HoplogReaderActivityListener readerListener;
-  
-  // logger instance
-  protected static final Logger logger = LogService.getLogger();
-  
-  protected static String logPrefix;
-  // THIS CONSTRUCTOR SHOULD BE USED FOR LONER ONLY
-  AbstractHoplog(FileSystem inputFS, Path filePath, SortedOplogStatistics stats)
-      throws IOException {
-    logPrefix = "<" + filePath.getName() + "> ";
-    this.fsProvider = new FSProvider(inputFS);
-    initialize(filePath, stats, inputFS);
-  }
-
-  public AbstractHoplog(HDFSStoreImpl store, Path filePath,
-      SortedOplogStatistics stats) throws IOException {
-    logPrefix = "<" + filePath.getName() + "> ";
-    this.fsProvider = new FSProvider(store);
-    initialize(filePath, stats, store.getFileSystem());
-  }
-
-  private void initialize(Path path, SortedOplogStatistics stats, FileSystem fs) {
-    this.conf = fs.getConf();
-    this.stats = stats;
-    this.path = fs.makeQualified(path);
-    this.hfd = new HoplogDescriptor(this.path.getName());
-  }
-  
-  @Override
-  public abstract void close() throws IOException; 
-  @Override
-  public abstract HoplogReader getReader() throws IOException;
-
-  @Override
-  public abstract HoplogWriter createWriter(int keys) throws IOException;
-
-  @Override
-  abstract public void close(boolean clearCache) throws IOException;
-
-  @Override
-  public void setReaderActivityListener(HoplogReaderActivityListener listener) {
-    this.readerListener = listener;
-  }
-  
-  @Override
-  public String getFileName() {
-    return this.hfd.getFileName();
-  }
-  
-  public final int compareTo(Hoplog o) {
-    return hfd.compareTo( ((AbstractHoplog)o).hfd);
-  }
-
-  @Override
-  public ICardinality getEntryCountEstimate() throws IOException {
-    return null;
-  }
-  
-  @Override
-  public synchronized void rename(String name) throws IOException {
-    if (logger.isDebugEnabled())
-      logger.debug("{}Renaming hoplog to " + name, logPrefix);
-    Path parent = path.getParent();
-    Path newPath = new Path(parent, name);
-    fsProvider.getFS().rename(path, new Path(parent, newPath));
-
-    // close the old reader and let the new one get created lazily
-    close();
-    
-    // update path to point to the new path
-    path = newPath;
-    this.hfd = new HoplogDescriptor(this.path.getName());
-    logPrefix = "<" + path.getName() + "> ";
-  }
-  
-  @Override
-  public synchronized void delete() throws IOException {
-    if (logger.isDebugEnabled())
-      logger.debug("{}Deleting hoplog", logPrefix);
-    close();
-    this.hoplogModificationTime = null;
-    this.hoplogSize = null;
-    fsProvider.getFS().delete(path, false);
-  }
-
-  @Override
-  public long getModificationTimeStamp() {
-    initHoplogSizeTimeInfo();
-
-    // modification time will not be null if this hoplog is existing. Otherwise
-    // invocation of this method should is invalid
-    if (hoplogModificationTime == null) {
-      throw new IllegalStateException();
-    }
-    
-    return hoplogModificationTime;
-  }
-
-  @Override
-  public long getSize() {
-    initHoplogSizeTimeInfo();
-    
-    // size will not be null if this hoplog is existing. Otherwise
-    // invocation of this method should is invalid
-    if (hoplogSize == null) {
-      throw new IllegalStateException();
-    }
-    
-    return hoplogSize;
-  }
-  
-  private synchronized void initHoplogSizeTimeInfo() {
-    if (hoplogSize != null && hoplogModificationTime != null) {
-      // time and size info is already initialized. no work needed here
-      return;
-    }
-
-    try {
-      FileStatus[] filesInfo = FSUtils.listStatus(fsProvider.getFS(), path, null);
-      if (filesInfo != null && filesInfo.length == 1) {
-        this.hoplogModificationTime = filesInfo[0].getModificationTime();
-        this.hoplogSize = filesInfo[0].getLen();
-      }
-      // TODO else condition may happen if user deletes hoplog from the file system.
-    } catch (IOException e) {
-      logger.error(LocalizedMessage.create(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE, path), e);
-      throw new HDFSIOException(
-          LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(path),e);
-    }
-  }
-  public static SequenceFile.Writer getSequenceFileWriter(Path path, 
-      Configuration conf, Logger logger) throws IOException {
-    return getSequenceFileWriter(path,conf, logger, null); 
-  }
-  
-  /**
-   * 
-   * @param path
-   * @param conf
-   * @param logger
-   * @param version - is being used only for testing. Should be passed as null for other purposes. 
-   * @return SequenceFile.Writer 
-   * @throws IOException
-   */
-  public static SequenceFile.Writer getSequenceFileWriter(Path path, 
-    Configuration conf, Logger logger, Version version) throws IOException {
-    Option optPath = SequenceFile.Writer.file(path);
-    Option optKey = SequenceFile.Writer.keyClass(BytesWritable.class);
-    Option optVal = SequenceFile.Writer.valueClass(BytesWritable.class);
-    Option optCom = withCompression(logger);
-    if (logger.isDebugEnabled())
-      logger.debug("{}Started creating hoplog " + path, logPrefix);
-    
-    if (version == null)
-      version = Version.CURRENT;
-    //Create a metadata option with the gemfire version, for future versioning
-    //of the key and value format
-    SequenceFile.Metadata metadata = new SequenceFile.Metadata();
-    metadata.set(new Text(Meta.GEMFIRE_VERSION.name()), new Text(String.valueOf(version.ordinal())));
-    Option optMeta = SequenceFile.Writer.metadata(metadata);
-    
-    SequenceFile.Writer writer = SequenceFile.createWriter(conf, optPath, optKey, optVal, optCom, optMeta);
-    
-    return writer;
-  }
-  
-  private static Option withCompression(Logger logger) {
-    String prop = System.getProperty(HoplogConfig.COMPRESSION);
-    if (prop != null) {
-      CompressionCodec codec;
-      if (prop.equalsIgnoreCase("SNAPPY")) {
-        codec = new SnappyCodec();
-      } else if (prop.equalsIgnoreCase("LZ4")) {
-        codec = new Lz4Codec();
-      } else if (prop.equals("GZ")) {
-        codec = new GzipCodec();
-      } else {
-        throw new IllegalStateException("Unsupported codec: " + prop);
-      }
-      if (logger.isDebugEnabled())
-        logger.debug("{}Using compression codec " + codec, logPrefix);
-      return SequenceFile.Writer.compression(CompressionType.BLOCK, codec);
-    }
-    return SequenceFile.Writer.compression(CompressionType.NONE, null);
-  }
-  
-  public static final class HoplogDescriptor implements Comparable<HoplogDescriptor> {
-     private final String fileName;
-     private final String bucket;
-     private final int sequence;
-     private final long timestamp;
-     private final String extension;
-     
-     HoplogDescriptor(final String fileName) {
-       this.fileName = fileName;
-       final Matcher matcher = AbstractHoplogOrganizer.HOPLOG_NAME_PATTERN.matcher(fileName);
-       final boolean matched = matcher.find();
-       assert matched;
-       this.bucket = matcher.group(1);
-       this.sequence = Integer.valueOf(matcher.group(3));
-       this.timestamp = Long.valueOf(matcher.group(2)); 
-       this.extension = matcher.group(4);
-     }
-     
-     public final String getFileName() {
-       return fileName;
-     }
-     
-     @Override
-     public boolean equals(Object o) {
-       if (this == o) {
-         return true;
-       }
-       
-       if (!(o instanceof HoplogDescriptor)) {
-         return false;
-       }
-       
-       final HoplogDescriptor other = (HoplogDescriptor)o;
-       // the two files should belong to same bucket
-       assert this.bucket.equals(other.bucket);
-       
-       // compare sequence first
-       if (this.sequence != other.sequence) {
-         return false;
-       }
-       
-       // sequence is same, compare timestamps
-       if (this.timestamp != other.timestamp) {
-         return false;
-       }
-       
-       return extension.equals(other.extension);
-     }
-
-    @Override
-    public int compareTo(HoplogDescriptor o) {
-      if (this == o) {
-        return 0;
-      }
-      
-      // the two files should belong to same bucket
-      assert this.bucket.equals(o.bucket);
-      
-      // compare sequence first
-      if (sequence > o.sequence) {
-        return -1;
-      } else if (sequence < o.sequence) {
-        return 1;
-      }
-      
-      // sequence is same, compare timestamps
-      if(timestamp > o.timestamp) {
-        return -1; 
-      } else if (timestamp < o.timestamp) {
-        return 1;
-      }
-      
-      //timestamp is the same, compare the file extension. It's
-      //possible a major compaction and minor compaction could finish
-      //at the same time and create the same timestamp and sequence number
-      //it doesn't matter which file we look at first in that case.
-      return extension.compareTo(o.extension);
-    }
-     
-     
-  }
-  
-  protected static final class FSProvider {
-    final FileSystem fs;
-    final HDFSStoreImpl store;
-    
-    // THIS METHOD IS FOR TESTING ONLY
-    FSProvider(FileSystem fs) {
-      this.fs = fs;
-      this.store = null;
-    }
-    
-    FSProvider(HDFSStoreImpl store) {
-      this.store = store;
-      fs = null;
-    }
-    
-    public FileSystem getFS() throws IOException {
-      if (store != null) {
-        return store.getFileSystem();
-      }
-      return fs;
-    }
-
-    public FileSystem checkFileSystem() {
-      store.checkAndClearFileSystem();
-      return store.getCachedFileSystem();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplogOrganizer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplogOrganizer.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplogOrganizer.java
deleted file mode 100644
index 4f078d8..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplogOrganizer.java
+++ /dev/null
@@ -1,430 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.gemstone.gemfire.cache.Operation;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
-import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplog.HoplogDescriptor;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
-import com.gemstone.gemfire.internal.Assert;
-import com.gemstone.gemfire.internal.cache.BucketRegion;
-import com.gemstone.gemfire.internal.cache.ForceReattemptException;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-import org.apache.logging.log4j.Logger;
-
-
-public abstract class AbstractHoplogOrganizer<T extends PersistedEventImpl> implements HoplogOrganizer<T> {
-
-  public static final String MINOR_HOPLOG_EXTENSION = ".ihop";
-  public static final String MAJOR_HOPLOG_EXTENSION = ".chop";
-  public static final String EXPIRED_HOPLOG_EXTENSION = ".exp";
-  public static final String TEMP_HOPLOG_EXTENSION = ".tmp";
-
-  public static final String FLUSH_HOPLOG_EXTENSION = ".hop";
-  public static final String SEQ_HOPLOG_EXTENSION = ".shop";
-
-  // all valid hoplogs will follow the following name pattern
-  public static final String HOPLOG_NAME_REGEX = "(.+?)-(\\d+?)-(\\d+?)";
-  public static final Pattern HOPLOG_NAME_PATTERN = Pattern.compile(HOPLOG_NAME_REGEX
-      + "\\.(.*)");
-  
-  public static boolean JUNIT_TEST_RUN = false; 
-
-  protected static final boolean ENABLE_INTEGRITY_CHECKS = Boolean
-      .getBoolean("gemfire.HdfsSortedOplogOrganizer.ENABLE_INTEGRITY_CHECKS")
-      || assertionsEnabled();
-
-  private static boolean assertionsEnabled() {
-    boolean enabled = false;
-    assert enabled = true;
-    return enabled;
-  }
-
-  protected HdfsRegionManager regionManager;
-  // name or id of bucket managed by this organizer
-  protected final String regionFolder;
-  protected final int bucketId;
-
-  // path of the region directory
-  protected final Path basePath;
-  // identifies path of directory containing a bucket's oplog files
-  protected final Path bucketPath;
-
-  protected final HDFSStoreImpl store;
-
-  // assigns a unique increasing number to each oplog file
-  protected AtomicInteger sequence;
-
-  //logger instance
-  protected static final Logger logger = LogService.getLogger();
-  protected final String logPrefix;
-
-  protected SortedOplogStatistics stats;
-  AtomicLong bucketDiskUsage = new AtomicLong(0);
-
-  // creation of new files and expiration of files will be synchronously
-  // notified to the listener
-  protected HoplogListener listener;
-
-  private volatile boolean closed = false;
-  
-  protected Object changePrimarylockObject = new Object();
-  
-  public AbstractHoplogOrganizer(HdfsRegionManager region, int bucketId) {
-
-    assert region != null;
-
-    this.regionManager = region;
-    this.regionFolder = region.getRegionFolder();
-    this.store = region.getStore();
-    this.listener = region.getListener();
-    this.stats = region.getHdfsStats();
-    
-    this.bucketId = bucketId;
-
-    this.basePath = new Path(store.getHomeDir());
-    this.bucketPath = new Path(basePath, regionFolder + "/" + bucketId);
-
-    this.logPrefix = "<" + getRegionBucketStr() + "> ";
-    
-  }
-
-  @Override
-  public boolean isClosed() {
-    return closed || regionManager.isClosed();
-  }
-  
-  @Override
-  public void close() throws IOException {
-    closed = true;
-    
-    // this bucket is closed and may be owned by a new node. So reduce the store
-    // usage stat, as the new owner adds the usage metric
-    incrementDiskUsage((-1) * bucketDiskUsage.get());
-  }
-
-  @Override
-  public abstract void flush(Iterator<? extends QueuedPersistentEvent> bufferIter,
-      int count) throws IOException, ForceReattemptException;
-
-  @Override
-  public abstract void clear() throws IOException;
-
-  protected abstract Hoplog getHoplog(Path hoplogPath) throws IOException;
-
-  @Override
-  public void hoplogCreated(String region, int bucketId, Hoplog... oplogs)
-      throws IOException {
-    throw new UnsupportedOperationException("Not supported for "
-        + this.getClass().getSimpleName());
-  }
-
-  @Override
-  public void hoplogDeleted(String region, int bucketId, Hoplog... oplogs)
-      throws IOException {
-    throw new UnsupportedOperationException("Not supported for "
-        + this.getClass().getSimpleName());
-  }
-
-  @Override
-  public void compactionCompleted(String region, int bucket, boolean isMajor) {
-    throw new UnsupportedOperationException("Not supported for "
-        + this.getClass().getSimpleName());
-  }
-  
-  @Override
-  public T read(byte[] key) throws IOException {
-    throw new UnsupportedOperationException("Not supported for "
-        + this.getClass().getSimpleName());
-  }
-
-  @Override
-  public HoplogIterator<byte[], T> scan() throws IOException {
-    throw new UnsupportedOperationException("Not supported for "
-        + this.getClass().getSimpleName());
-  }
-
-  @Override
-  public HoplogIterator<byte[], T> scan(byte[] from, byte[] to)
-      throws IOException {
-    throw new UnsupportedOperationException("Not supported for "
-        + this.getClass().getSimpleName());
-  }
-
-  @Override
-  public HoplogIterator<byte[], T> scan(byte[] from,
-      boolean fromInclusive, byte[] to, boolean toInclusive) throws IOException {
-    throw new UnsupportedOperationException("Not supported for "
-        + this.getClass().getSimpleName());
-  }
-
-  @Override
-  public long sizeEstimate() {
-    throw new UnsupportedOperationException("Not supported for "
-        + this.getClass().getSimpleName());
-  }
-
-  /**
-   * @return returns an oplogs full path after prefixing bucket path to the file
-   *         name
-   */
-  protected String getPathStr(Hoplog oplog) {
-    return bucketPath.toString() + "/" + oplog.getFileName();
-  }
-
-  protected String getRegionBucketStr() {
-    return regionFolder + "/" + bucketId;
-  }
-
-  protected SortedHoplogPersistedEvent deserializeValue(byte[] val) throws IOException {
-    try {
-      return SortedHoplogPersistedEvent.fromBytes(val);
-    } catch (ClassNotFoundException e) {
-      logger
-          .error(
-              LocalizedStrings.GetMessage_UNABLE_TO_DESERIALIZE_VALUE_CLASSNOTFOUNDEXCEPTION,
-              e);
-      return null;
-    }
-  }
-
-  /**
-   * @return true if the entry belongs to an destroy event
-   */
-  protected boolean isDeletedEntry(byte[] value, int offset) throws IOException {
-    // Read only the first byte of PersistedEventImpl for the operation
-    assert value != null && value.length > 0 && offset >= 0 && offset < value.length;
-    Operation op = Operation.fromOrdinal(value[offset]);
-
-    if (op.isDestroy() || op.isInvalidate()) {
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * @param seqNum
-   *          desired sequence number of the hoplog. If null a highest number is
-   *          choosen
-   * @param extension
-   *          file extension representing the type of file, e.g. ihop for
-   *          intermediate hoplog
-   * @return a new temporary file for a new sorted oplog. The name consists of
-   *         bucket name, a sequence number for ordering the files followed by a
-   *         timestamp
-   */
-  Hoplog getTmpSortedOplog(Integer seqNum, String extension) throws IOException {
-    if (seqNum == null) {
-      seqNum = sequence.incrementAndGet();
-    }
-    String name = bucketId + "-" + System.currentTimeMillis() + "-" + seqNum 
-        + extension;
-    Path soplogPath = new Path(bucketPath, name + TEMP_HOPLOG_EXTENSION);
-    return getHoplog(soplogPath);
-  }
-  
-  /**
-   * renames a temporary hoplog file to a legitimate name.
-   */
-  static void makeLegitimate(Hoplog so) throws IOException {
-    String name = so.getFileName();
-    assert name.endsWith(TEMP_HOPLOG_EXTENSION);
-
-    int index = name.lastIndexOf(TEMP_HOPLOG_EXTENSION);
-    name = name.substring(0, index);
-    so.rename(name);
-  }
-
-  /**
-   * creates a expiry marker for a file on file system
-   * 
-   * @param hoplog
-   * @throws IOException
-   */
-  protected void addExpiryMarkerForAFile(Hoplog hoplog) throws IOException {
-    FileSystem fs = store.getFileSystem();
-
-    // TODO optimization needed here. instead of creating expired marker
-    // file per file, create a meta file. the main thing to worry is
-    // compaction of meta file itself
-    Path expiryMarker = getExpiryMarkerPath(hoplog.getFileName());
-
-    // uh-oh, why are we trying to expire an already expired file?
-    if (ENABLE_INTEGRITY_CHECKS) {
-      Assert.assertTrue(!fs.exists(expiryMarker),
-          "Expiry marker already exists: " + expiryMarker);
-    }
-
-    FSDataOutputStream expiryMarkerFile = fs.create(expiryMarker);
-    expiryMarkerFile.close();
-
-    if (logger.isDebugEnabled())
-      logger.debug("Hoplog marked expired: " + getPathStr(hoplog));
-  }
-
-  protected Path getExpiryMarkerPath(String name) {
-    return new Path(bucketPath, name + EXPIRED_HOPLOG_EXTENSION);
-  }
-  
-  protected String truncateExpiryExtension(String name) {
-    if (name.endsWith(EXPIRED_HOPLOG_EXTENSION)) {
-      return name.substring(0, name.length() - EXPIRED_HOPLOG_EXTENSION.length());
-    }
-    
-    return name;
-  }
-  
-  /**
-   * updates region stats and a local copy of bucket level store usage metric.
-   * 
-   * @param delta
-   */
-  protected void incrementDiskUsage(long delta) {
-    long newSize = bucketDiskUsage.addAndGet(delta);
-    if (newSize < 0 && delta < 0) {
-      if (logger.isDebugEnabled()){
-        logger.debug("{}Invalid diskUsage size:" + newSize + " caused by delta:"
-            + delta + ", parallel del & close?" + isClosed(), logPrefix);
-      }
-      if (isClosed()) {
-        // avoid corrupting disk usage size during close by reducing residue
-        // size only
-        delta = delta + (-1 * newSize);
-      }
-    }
-    stats.incStoreUsageBytes(delta);
-  }
-
-  /**
-   * Utility method to remove a file from valid file list if a expired marker
-   * for the file exists
-   * 
-   * @param valid
-   *          list of valid files
-   * @param expired
-   *          list of expired file markers
-   * @return list f valid files that do not have a expired file marker
-   */
-  public static FileStatus[] filterValidHoplogs(FileStatus[] valid,
-      FileStatus[] expired) {
-    if (valid == null) {
-      return null;
-    }
-
-    if (expired == null) {
-      return valid;
-    }
-
-    ArrayList<FileStatus> result = new ArrayList<FileStatus>();
-    for (FileStatus vs : valid) {
-      boolean found = false;
-      for (FileStatus ex : expired) {
-        if (ex
-            .getPath()
-            .getName()
-            .equals(
-                vs.getPath().getName()
-                    + HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION)) {
-          found = true;
-        }
-      }
-      if (!found) {
-        result.add(vs);
-      }
-    }
-
-    return result.toArray(new FileStatus[result.size()]);
-  }
-
-  protected void pingSecondaries() throws ForceReattemptException {
-
-    if (JUNIT_TEST_RUN)
-      return;
-    BucketRegion br = ((PartitionedRegion)this.regionManager.getRegion()).getDataStore().getLocalBucketById(this.bucketId);
-    boolean secondariesPingable = false;
-    try {
-      secondariesPingable = br.areSecondariesPingable();
-    } catch (Throwable e) {
-      throw new ForceReattemptException("Failed to ping secondary servers of bucket: " + 
-          this.bucketId + ", region: " + ((PartitionedRegion)this.regionManager.getRegion()), e);
-    }
-    if (!secondariesPingable)
-      throw new ForceReattemptException("Failed to ping secondary servers of bucket: " + 
-          this.bucketId + ", region: " + ((PartitionedRegion)this.regionManager.getRegion()));
-  }
-
-  
-
-  
-  /**
-   * A comparator for ordering soplogs based on the file name. The file names
-   * are assigned incrementally and hint at the age of the file
-   */
-  public static final class HoplogComparator implements
-      Comparator<TrackedReference<Hoplog>> {
-    /**
-     * a file with a higher sequence or timestamp is the younger and hence the
-     * smaller
-     */
-    @Override
-    public int compare(TrackedReference<Hoplog> o1, TrackedReference<Hoplog> o2) {
-      return o1.get().compareTo(o2.get());
-    }
-
-    /**
-     * Compares age of files based on file names and returns 1 if name1 is
-     * older, -1 if name1 is yonger and 0 if the two files are same age
-     */
-    public static int compareByName(String name1, String name2) {
-      HoplogDescriptor hd1 = new HoplogDescriptor(name1);
-      HoplogDescriptor hd2 = new HoplogDescriptor(name2);
-      
-      return hd1.compareTo(hd2);
-    }
-  }
-
-  /**
-   * @param matcher
-   *          A preinitialized / matched regex pattern
-   * @return Timestamp of the
-   */
-  public static long getHoplogTimestamp(Matcher matcher) {
-    return Long.valueOf(matcher.group(2));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BloomFilter.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BloomFilter.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BloomFilter.java
deleted file mode 100644
index 86e66a1..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BloomFilter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-public interface BloomFilter {
-  /**
-   * Returns true if the bloom filter might contain the supplied key. The nature of the bloom filter
-   * is such that false positives are allowed, but false negatives cannot occur.
-   */
-  boolean mightContain(byte[] key);
-
-  /**
-   * Returns true if the bloom filter might contain the supplied key. The nature of the bloom filter
-   * is such that false positives are allowed, but false negatives cannot occur.
-   */
-  boolean mightContain(byte[] key, int keyOffset, int keyLength);
-
-  /**
-   * @return Size of the bloom, in bytes
-   */
-  long getBloomSize();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CloseTmpHoplogsTimerTask.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CloseTmpHoplogsTimerTask.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CloseTmpHoplogsTimerTask.java
deleted file mode 100644
index 3f67de8..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CloseTmpHoplogsTimerTask.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.util.Collection;
-
-import org.apache.hadoop.fs.FileSystem;
-
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
-import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-import org.apache.logging.log4j.Logger;
-
-/**
- * For streaming case, if the bucket traffic goes down after writing few batches of data, 
- * the flush doesn't get called. In that case, the file is left in tmp state
- * until the flush restarts. To avoid this issue, added this timer task 
- * that periodically iterates over the buckets and closes their writer 
- * if the time for rollover has passed.
- * 
- * It also has got an extra responsibility of fixing the file sizes of the files 
- * that weren't closed properly last time. 
- *
- *
- */
-class CloseTmpHoplogsTimerTask extends SystemTimerTask {
-  
-  private HdfsRegionManager hdfsRegionManager;
-  private static final Logger logger = LogService.getLogger();
-  private FileSystem filesystem; 
-  
-  public CloseTmpHoplogsTimerTask(HdfsRegionManager hdfsRegionManager) {
-    this.hdfsRegionManager = hdfsRegionManager;
-    
-    // Create a new filesystem 
-    // This is added for the following reason:
-    // For HDFS, if a file wasn't closed properly last time, 
-    // while calling FileSystem.append for this file, FSNamesystem.startFileInternal->
-    // FSNamesystem.recoverLeaseInternal function gets called. 
-    // This function throws AlreadyBeingCreatedException if there is an open handle, to any other file, 
-    // created using the same FileSystem object. This is a bug and is being tracked at: 
-    // https://issues.apache.org/jira/browse/HDFS-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
-    // 
-    // The fix for this bug is not yet part of Pivotal HD. So to overcome the bug, 
-    // we create a new file system for the timer task so that it does not encounter the bug. 
-    this.filesystem = this.hdfsRegionManager.getStore().createFileSystem();
-    if (logger.isDebugEnabled()) 
-      logger.debug("created a new file system specifically for timer task");
-  }
-
-  
-  /**
-   * Iterates over all the bucket organizers and closes their writer if the time for 
-   * rollover has passed. It also has the additional responsibility of fixing the tmp
-   * files that were left over in the last unsuccessful run. 
-   */
-  @Override
-  public void run2() {
-    Collection<HoplogOrganizer> organizers =  hdfsRegionManager.getBucketOrganizers();
-    if (logger.isDebugEnabled()) 
-      logger.debug("Starting the close temp logs run.");
-    
-    for (HoplogOrganizer organizer: organizers) {
-      
-      HDFSUnsortedHoplogOrganizer unsortedOrganizer = (HDFSUnsortedHoplogOrganizer)organizer;
-      long timeSinceLastFlush = (System.currentTimeMillis() - unsortedOrganizer.getLastFlushTime())/1000 ;
-      try {
-        this.hdfsRegionManager.getRegion().checkReadiness();
-      } catch (Exception e) {
-        break;
-      }
-      
-      try {
-        // the time since last flush has exceeded file rollover interval, roll over the 
-        // file. 
-        if (timeSinceLastFlush >= unsortedOrganizer.getfileRolloverInterval()) {
-          if (logger.isDebugEnabled()) 
-            logger.debug("Closing writer for bucket: " + unsortedOrganizer.bucketId);
-          unsortedOrganizer.synchronizedCloseWriter(false, timeSinceLastFlush, 0);
-        }
-        
-        // fix the tmp hoplogs, if any. Pass the new file system here. 
-        unsortedOrganizer.identifyAndFixTmpHoplogs(this.filesystem);
-        
-      } catch (Exception e) {
-        logger.warn(LocalizedStrings.HOPLOG_CLOSE_FAILED, e);
-      }
-    }
-    
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CompactionStatus.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CompactionStatus.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CompactionStatus.java
deleted file mode 100644
index 55d8f87..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CompactionStatus.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import com.gemstone.gemfire.internal.VersionedDataSerializable;
-import com.gemstone.gemfire.internal.Version;
-
-/**
- * Status of the compaction task reported in the future
- * 
- */
-public class CompactionStatus implements VersionedDataSerializable {
-  /**MergeGemXDHDFSToGFE check and verify serializationversions **/
- 
-  private static Version[] serializationVersions = new Version[]{ Version.GFE_81 };
-  private int bucketId;
-  private boolean status;
-
-  public CompactionStatus() {
-  }
-
-  public CompactionStatus(int bucketId, boolean status) {
-    this.bucketId = bucketId;
-    this.status = status;
-  }
-  public int getBucketId() {
-    return bucketId;
-  }
-  public boolean isStatus() {
-    return status;
-  }
-  @Override
-  public void toData(DataOutput out) throws IOException {
-    out.writeInt(bucketId);
-    out.writeBoolean(status);
-  }
-  @Override
-  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-    this.bucketId = in.readInt();
-    this.status = in.readBoolean();
-  }
-  @Override
-  public Version[] getSerializationVersions() {
-    return serializationVersions;
-  }
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append(getClass().getCanonicalName()).append("@")
-    .append(System.identityHashCode(this)).append(" Bucket:")
-    .append(bucketId).append(" status:").append(status);
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/FlushStatus.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/FlushStatus.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/FlushStatus.java
deleted file mode 100644
index 84beded..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/FlushStatus.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import com.gemstone.gemfire.internal.VersionedDataSerializable;
-import com.gemstone.gemfire.internal.Version;
-
-/**
- * Reports the result of a flush request.
- * 
- */
-public class FlushStatus implements VersionedDataSerializable {
-  private static Version[] serializationVersions = new Version[]{ Version.GFE_81 };
-  private int bucketId;
-
-  private final static int LAST = -1;
-  
-  public FlushStatus() {
-  }
-
-  public static FlushStatus last() {
-    return new FlushStatus(LAST);
-  }
-  
-  public FlushStatus(int bucketId) {
-    this.bucketId = bucketId;
-  }
-  public int getBucketId() {
-    return bucketId;
-  }
-  public boolean isLast() {
-    return bucketId == LAST;
-  }
-  @Override
-  public void toData(DataOutput out) throws IOException {
-    out.writeInt(bucketId);
-  }
-  @Override
-  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-    this.bucketId = in.readInt();
-  }
-  @Override
-  public Version[] getSerializationVersions() {
-    return serializationVersions;
-  }
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append(getClass().getCanonicalName()).append("@")
-    .append(System.identityHashCode(this)).append(" Bucket:")
-    .append(bucketId);
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java
deleted file mode 100644
index ba191c2..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer.Compactor;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-
-/**
- * A singleton which schedules compaction of hoplogs owned by this node as primary and manages
- * executor of ongoing compactions. Ideally the number of pending request will not exceed the number
- * of buckets in the node as hoplog organizer avoids creating a new request if compaction on the
- * bucket is active. Moreover separate queues for major and minor compactions are maintained to
- * prevent long running major compactions from preventing minor compactions.
- */
-public class HDFSCompactionManager {
-  /*
-   * Each hdfs store has its own concurrency configuration. Concurrency
-   * configuration is used by compaction manager to manage threads. This member
-   * holds hdsf-store to compaction manager mapping
-   */
-  private static final ConcurrentHashMap<String, HDFSCompactionManager> storeToManagerMap = 
-                                        new ConcurrentHashMap<String, HDFSCompactionManager>();
-
-  // hdfs store configuration used to initialize this instance
-  HDFSStore storeConfig;
-  
-  // Executor for ordered execution of minor compaction requests.
-  private final CompactionExecutor minorCompactor;
-  // Executor for ordered execution of major compaction requests.
-  private final CompactionExecutor majorCompactor;
-
-  private static final Logger logger = LogService.getLogger();
-  protected final static String logPrefix =  "<" + "HDFSCompactionManager" + "> ";;
-  
-  private HDFSCompactionManager(HDFSStore config) {
-    this.storeConfig = config;
-    // configure hdfs compaction manager
-    int capacity = Integer.getInteger(HoplogConfig.COMPCATION_QUEUE_CAPACITY,
-        HoplogConfig.COMPCATION_QUEUE_CAPACITY_DEFAULT);
-
-    minorCompactor = new CompactionExecutor(config.getMinorCompactionThreads(), capacity, "MinorCompactor_"
-        + config.getName());
-
-    majorCompactor = new CompactionExecutor(config.getMajorCompactionThreads(), capacity, "MajorCompactor_"
-        + config.getName());
-
-    minorCompactor.allowCoreThreadTimeOut(true);
-    majorCompactor.allowCoreThreadTimeOut(true);
-  }
-
-  public static synchronized HDFSCompactionManager getInstance(HDFSStore config) {
-    HDFSCompactionManager instance = storeToManagerMap.get(config.getName());
-    if (instance == null) {
-      instance = new HDFSCompactionManager(config);
-      storeToManagerMap.put(config.getName(), instance);
-    }
-    
-    return instance;
-  }
-
-  /**
-   * Accepts compaction request for asynchronous compaction execution.
-   * 
-   * @param request
-   *          compaction request with region and bucket id
-   * @return true if the request is accepted, false if the compactor is overlaoded and there is a
-   *         long wait queue
-   */
-  public synchronized Future<CompactionStatus> submitRequest(CompactionRequest request) {
-    if (!request.isForced && request.compactor.isBusy(request.isMajor)) {
-      if (logger.isDebugEnabled()) {
-        fineLog("Compactor is busy. Ignoring ", request);
-      }
-      return null;
-    }
-    
-    CompactionExecutor executor = request.isMajor ? majorCompactor : minorCompactor;
-    
-    try {
-      return executor.submit(request);
-    } catch (Throwable e) {
-      if (e instanceof CompactionIsDisabled) {
-        if (logger.isDebugEnabled()) {
-          fineLog("{}" +e.getMessage(), logPrefix);
-        }
-      } else {
-        logger.info(LocalizedMessage.create(LocalizedStrings.ONE_ARG, "Compaction request submission failed"), e);
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Removes all pending compaction requests. Programmed for TESTING ONLY
-   */
-  public void reset() {
-    minorCompactor.shutdownNow();
-    majorCompactor.shutdownNow();
-    HDFSCompactionManager.storeToManagerMap.remove(storeConfig.getName());
-  }
-  
-  /**
-   * Returns minor compactor. Programmed for TESTING AND MONITORING ONLY  
-   */
-  public ThreadPoolExecutor getMinorCompactor() {
-    return minorCompactor;
-  }
-
-  /**
-   * Returns major compactor. Programmed for TESTING AND MONITORING ONLY  
-   */
-  public ThreadPoolExecutor getMajorCompactor() {
-    return majorCompactor;
-  }
-  
-  /**
-   * Contains important details needed for executing a compaction cycle.
-   */
-  public static class CompactionRequest implements Callable<CompactionStatus> {
-    String regionFolder;
-    int bucket;
-    Compactor compactor;
-    boolean isMajor;
-    final boolean isForced;
-    final boolean versionUpgrade;
-
-    public CompactionRequest(String regionFolder, int bucket, Compactor compactor, boolean major) {
-      this(regionFolder, bucket, compactor, major, false);
-    }
-
-    public CompactionRequest(String regionFolder, int bucket, Compactor compactor, boolean major, boolean isForced) {
-      this(regionFolder, bucket, compactor, major, isForced, false);
-    }
-
-    public CompactionRequest(String regionFolder, int bucket, Compactor compactor, boolean major, boolean isForced, boolean versionUpgrade) {
-      this.regionFolder = regionFolder;
-      this.bucket = bucket;
-      this.compactor = compactor;
-      this.isMajor = major;
-      this.isForced = isForced;
-      this.versionUpgrade = versionUpgrade;
-    }
-
-    @Override
-    public CompactionStatus call() throws Exception {
-      HDFSStore store = compactor.getHdfsStore();
-      if (!isForced) {
-        // this is a auto generated compaction request. If auto compaction is
-        // disabled, ignore this call.
-        if (isMajor && !store.getMajorCompaction()) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("{}Major compaction is disabled. Ignoring request",logPrefix);
-          }
-          return new CompactionStatus(bucket, false);
-        } else if (!isMajor && !store.getMinorCompaction()) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("{}Minor compaction is disabled. Ignoring request", logPrefix);
-          }
-          return new CompactionStatus(bucket, false);
-        }
-      }
-
-      // all hurdles passed, execute compaction now
-      try {
-        boolean status = compactor.compact(isMajor, versionUpgrade);
-        return new CompactionStatus(bucket, status);
-      } catch (IOException e) {
-        logger.error(LocalizedMessage.create(LocalizedStrings.HOPLOG_HDFS_COMPACTION_ERROR, bucket), e);
-      }
-      return new CompactionStatus(bucket, false);
-    }
-
-    @Override
-    public int hashCode() {
-      final int prime = 31;
-      int result = 1;
-      result = prime * result + bucket;
-      result = prime * result
-          + ((regionFolder == null) ? 0 : regionFolder.hashCode());
-      return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj)
-        return true;
-      if (obj == null)
-        return false;
-      if (getClass() != obj.getClass())
-        return false;
-      CompactionRequest other = (CompactionRequest) obj;
-      if (bucket != other.bucket)
-        return false;
-      if (regionFolder == null) {
-        if (other.regionFolder != null)
-          return false;
-      } else if (!regionFolder.equals(other.regionFolder))
-        return false;
-      return true;
-    }
-
-    @Override
-    public String toString() {
-      return "CompactionRequest [regionFolder=" + regionFolder + ", bucket="
-          + bucket + ", isMajor=" + isMajor + ", isForced="+isForced+"]";
-    }
-  }
-
-  /**
-   * Helper class for creating named instances of comapction threads and managing compaction
-   * executor. All threads wait infinitely
-   */
-  private class CompactionExecutor extends ThreadPoolExecutor implements ThreadFactory {
-    final AtomicInteger count = new AtomicInteger(1);
-    private String name;
-
-    CompactionExecutor(int max, int capacity, String name) {
-      super(max, max, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(capacity));
-      allowCoreThreadTimeOut(true);
-      setThreadFactory(this);
-      this.name = name;
-    }
-    
-    private void throwIfStopped(CompactionRequest req, HDFSStore storeConfig) {
-      // check if compaction is enabled everytime. Alter may change this value
-      // so this check is needed everytime
-      boolean isEnabled = true;
-      isEnabled = storeConfig.getMinorCompaction();
-      if (req.isMajor) {
-        isEnabled = storeConfig.getMajorCompaction();
-      }
-      if (isEnabled || req.isForced) {
-        return;
-      }
-      throw new CompactionIsDisabled(name + " is disabled");
-    }
-
-    private void throwIfPoolSizeChanged(CompactionRequest task, HDFSStore config) {
-      int threadCount = config.getMinorCompactionThreads();
-      if (task.isMajor) {
-        threadCount = config.getMajorCompactionThreads();
-      }
-      
-      if (getCorePoolSize() < threadCount) {
-        setCorePoolSize(threadCount);
-      } else if (getCorePoolSize() > threadCount) {
-        setCorePoolSize(threadCount);
-      }
-      
-      if (!task.isForced && getActiveCount() > threadCount) {
-        // the number is active threads is more than new max pool size. Throw
-        // error is this is system generated compaction request
-        throw new CompactionIsDisabled(
-            "Rejecting to reduce the number of threads for " + name
-            + ", currently:" + getActiveCount() + " target:"
-            + threadCount);
-      }
-    }
-    
-    @Override
-    public <T> Future<T> submit(Callable<T> task) {
-      throwIfStopped((CompactionRequest) task, HDFSCompactionManager.this.storeConfig);
-      throwIfPoolSizeChanged((CompactionRequest) task, HDFSCompactionManager.this.storeConfig);
-      
-      if (logger.isDebugEnabled()) {
-        fineLog("New:", task, " pool:", getPoolSize(), " active:", getActiveCount());
-      }
-      return super.submit(task);
-    }
-
-    @Override
-    public Thread newThread(Runnable r) {
-      Thread thread = new Thread(r, name + ":" + count.getAndIncrement());
-      thread.setDaemon(true);
-      if (logger.isDebugEnabled()) {
-        fineLog("New thread:", name, " poolSize:", getPoolSize(),
-            " active:", getActiveCount());
-      }
-      return thread;
-    }
-  }
-  
-  public static class CompactionIsDisabled extends RejectedExecutionException {
-    private static final long serialVersionUID = 1L;
-    public CompactionIsDisabled(String name) {
-      super(name);
-    }
-  }
-  
-  
-  private void fineLog(Object... strings) {
-    if (logger.isDebugEnabled()) {
-      StringBuffer sb = new StringBuffer();
-      for (Object str : strings) {
-        sb.append(str.toString());
-      }
-      logger.debug("{}"+sb.toString(), logPrefix);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueArgs.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueArgs.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueArgs.java
deleted file mode 100644
index 36e171b..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueArgs.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.internal.VersionedDataSerializable;
-import com.gemstone.gemfire.internal.Version;
-
-/**
- * Defines the arguments to the flush queue request.
- * 
- */
-@SuppressWarnings("serial")
-public class HDFSFlushQueueArgs implements VersionedDataSerializable {
-
-  private static Version[] serializationVersions = new Version[]{ Version.GFE_81 };
-
-  private HashSet<Integer> buckets;
-
-  private long maxWaitTimeMillis;
-
-  public HDFSFlushQueueArgs() {
-  }
-
-  public HDFSFlushQueueArgs(Set<Integer> buckets, long maxWaitTime) {
-    this.buckets = new HashSet<Integer>(buckets);
-    this.maxWaitTimeMillis = maxWaitTime;
-  }
-
-  @Override
-  public void toData(DataOutput out) throws IOException {
-    DataSerializer.writeHashSet(buckets, out);
-    out.writeLong(maxWaitTimeMillis);
-  }
-
-  @Override
-  public void fromData(DataInput in) throws IOException,
-      ClassNotFoundException {
-    this.buckets = DataSerializer.readHashSet(in);
-    this.maxWaitTimeMillis = in.readLong();
-  }
-
-  @Override
-  public Version[] getSerializationVersions() {
-    return serializationVersions;
-  }
-
-  public Set<Integer> getBuckets() {
-    return (Set<Integer>) buckets;
-  }
-
-  public void setBuckets(Set<Integer> buckets) {
-    this.buckets = new HashSet<Integer>(buckets);
-  }
-
-  public boolean isSynchronous() {
-    return maxWaitTimeMillis == 0;
-  }
-
-  public long getMaxWaitTime() {
-    return this.maxWaitTimeMillis;
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append(getClass().getCanonicalName()).append("@")
-    .append(System.identityHashCode(this))
-    .append(" buckets:").append(buckets)
-    .append(" maxWaitTime:").append(maxWaitTimeMillis);
-    return sb.toString();
-  }
-}