You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:50:08 UTC
[22/25] incubator-geode git commit: GEODE-10: Reinstating HDFS
persistence code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java
new file mode 100644
index 0000000..82e2bf9
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.cache.CachedDeserializable;
+import com.gemstone.gemfire.internal.cache.CachedDeserializableFactory;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.lru.Sizeable;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * Event that is persisted in HDFS. As we need to persist some of the EntryEventImpl
+ * variables, we have created this class and have overridden toData and fromData functions.
+ *
+ * There are subclasses of this class of the different types of persisted events
+ * sorted vs. unsorted, and the persisted events we keep in the region
+ * queue, which need to hold the region key.
+ *
+ *
+ */
+public abstract class PersistedEventImpl {
+ protected Operation op = Operation.UPDATE;
+
+ protected Object valueObject;
+
+ /**
+ * A field with flags decribing the event
+ */
+ protected byte flags;
+
+ //FLags indicating the type of value
+ //if the value is not a byte array or object, is is an internal delta.
+ private static final byte VALUE_IS_BYTE_ARRAY= 0x01;
+ private static final byte VALUE_IS_OBJECT= (VALUE_IS_BYTE_ARRAY << 1);
+ private static final byte POSSIBLE_DUPLICATE = (VALUE_IS_OBJECT << 1);
+ private static final byte HAS_VERSION_TAG = (POSSIBLE_DUPLICATE << 1);
+
+
+ /** for deserialization */
+ public PersistedEventImpl() {
+ }
+
+ public PersistedEventImpl(Object value, Operation op, byte valueIsObject,
+ boolean isPossibleDuplicate, boolean hasVersionTag) throws IOException,
+ ClassNotFoundException {
+ this.op = op;
+ this.valueObject = value;
+ setFlag(VALUE_IS_BYTE_ARRAY, valueIsObject == 0x00);
+ setFlag(VALUE_IS_OBJECT, valueIsObject == 0x01);
+ setFlag(POSSIBLE_DUPLICATE, isPossibleDuplicate);
+ setFlag(HAS_VERSION_TAG, hasVersionTag);
+ }
+
+ private void setFlag(byte flag, boolean set) {
+ flags = (byte) (set ? flags | flag : flags & ~flag);
+ }
+
+ private boolean getFlag(byte flag) {
+ return (flags & flag) != 0x0;
+ }
+
+ public void toData(DataOutput out) throws IOException {
+ out.writeByte(this.op.ordinal);
+ out.writeByte(this.flags);
+
+ if (getFlag(VALUE_IS_BYTE_ARRAY)) {
+ DataSerializer.writeByteArray((byte[])this.valueObject, out);
+ } else if (getFlag(VALUE_IS_OBJECT)) {
+ if(valueObject instanceof CachedDeserializable) {
+ CachedDeserializable cd = (CachedDeserializable)valueObject;
+ DataSerializer.writeObjectAsByteArray(cd.getValue(), out);
+ } else {
+ DataSerializer.writeObjectAsByteArray(valueObject, out);
+ }
+ }
+ else {
+ DataSerializer.writeObject(valueObject, out);
+ }
+ }
+
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ this.op = Operation.fromOrdinal(in.readByte());
+ this.flags = in.readByte();
+
+ if (getFlag(VALUE_IS_BYTE_ARRAY)) {
+ this.valueObject = DataSerializer.readByteArray(in);
+ } else if (getFlag(VALUE_IS_OBJECT)) {
+ byte[] newValueBytes = DataSerializer.readByteArray(in);
+ if(newValueBytes == null) {
+ this.valueObject = null;
+ } else {
+ if(CachedDeserializableFactory.preferObject()) {
+ this.valueObject = EntryEventImpl.deserialize(newValueBytes);
+ } else {
+ this.valueObject = CachedDeserializableFactory.create(newValueBytes);
+ }
+ }
+ }
+ else {
+ this.valueObject = DataSerializer.readObject(in);
+ }
+
+ }
+
+ /**
+ * Return the timestamp of this event. Depending on the subclass,
+ * this may be part of the version tag, or a separate field.
+ */
+ public abstract long getTimstamp();
+
+ protected boolean hasVersionTag() {
+ return getFlag(HAS_VERSION_TAG);
+ }
+
+ public Operation getOperation()
+ {
+ return this.op;
+ }
+
+ public Object getValue() {
+ return this.valueObject;
+ }
+
+ public boolean isPossibleDuplicate()
+ {
+ return getFlag(POSSIBLE_DUPLICATE);
+ }
+
+ /**
+ * returns deserialized value.
+ *
+ */
+ public Object getDeserializedValue() throws IOException, ClassNotFoundException {
+ Object retVal = null;
+ if (getFlag(VALUE_IS_BYTE_ARRAY)) {
+ // value is a byte array
+ retVal = this.valueObject;
+ } else if (getFlag(VALUE_IS_OBJECT)) {
+ if(valueObject instanceof CachedDeserializable) {
+ retVal = ((CachedDeserializable)valueObject).getDeserializedForReading();
+ } else {
+ retVal = valueObject;
+ }
+ }
+ else {
+ // value is a object
+ retVal = this.valueObject;
+ }
+ return retVal;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder str = new StringBuilder(PersistedEventImpl.class.getSimpleName());
+ str.append("@").append(System.identityHashCode(this))
+ .append(" op:").append(op)
+ .append(" valueObject:").append(valueObject)
+ .append(" isPossibleDuplicate:").append(getFlag(POSSIBLE_DUPLICATE));
+ return str.toString();
+ }
+
+ public void copy(PersistedEventImpl usersValue) {
+ this.op = usersValue.op;
+ this.valueObject = usersValue.valueObject;
+ this.flags = usersValue.flags;
+ }
+
+ public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
+ int size = 0;
+
+ // value length
+ size += valueSize;
+
+ // one byte for op and one byte for flag
+ size += 2;
+
+ return size;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java
new file mode 100644
index 0000000..bd7994c
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public interface QueuedPersistentEvent {
+
+ public byte[] getRawKey();
+
+ public void toHoplogEventBytes(DataOutput out) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java
new file mode 100644
index 0000000..b97bdb7
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Tracks flushes using a queue of latches.
+ *
+ */
+public class SignalledFlushObserver implements FlushObserver {
+ private static class FlushLatch extends CountDownLatch {
+ private final long seqnum;
+
+ public FlushLatch(long seqnum) {
+ super(1);
+ this.seqnum = seqnum;
+ }
+
+ public long getSequence() {
+ return seqnum;
+ }
+ }
+
+ // assume the number of outstanding flush requests is small so we don't
+ // need to organize by seqnum
+ private final List<FlushLatch> signals;
+
+ private final AtomicLong eventsReceived;
+ private final AtomicLong eventsDelivered;
+
+ public SignalledFlushObserver() {
+ signals = new ArrayList<FlushLatch>();
+ eventsReceived = new AtomicLong(0);
+ eventsDelivered = new AtomicLong(0);
+ }
+
+ @Override
+ public boolean shouldDrainImmediately() {
+ synchronized (signals) {
+ return !signals.isEmpty();
+ }
+ }
+
+ @Override
+ public AsyncFlushResult flush() {
+ final long seqnum = eventsReceived.get();
+ synchronized (signals) {
+ final FlushLatch flush;
+ if (seqnum <= eventsDelivered.get()) {
+ flush = null;
+ } else {
+ flush = new FlushLatch(seqnum);
+ signals.add(flush);
+ }
+
+ return new AsyncFlushResult() {
+ @Override
+ public boolean waitForFlush(long timeout, TimeUnit unit) throws InterruptedException {
+ return flush == null ? true : flush.await(timeout, unit);
+ }
+ };
+ }
+ }
+
+ /**
+ * Invoked when an event is received.
+ */
+ public void push() {
+ eventsReceived.incrementAndGet();
+ }
+
+ /**
+ * Invoked when a batch has been dispatched.
+ */
+ public void pop(int count) {
+ long highmark = eventsDelivered.addAndGet(count);
+ synchronized (signals) {
+ for (ListIterator<FlushLatch> iter = signals.listIterator(); iter.hasNext(); ) {
+ FlushLatch flush = iter.next();
+ if (flush.getSequence() <= highmark) {
+ flush.countDown();
+ iter.remove();
+ }
+ }
+ }
+ }
+
+ /**
+ * Invoked when the queue is cleared.
+ */
+ public void clear() {
+ synchronized (signals) {
+ for (FlushLatch flush : signals) {
+ flush.countDown();
+ }
+
+ signals.clear();
+ eventsReceived.set(0);
+ eventsDelivered.set(0);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java
new file mode 100644
index 0000000..c725ce5
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * A persistent event that is stored in the hoplog queue. This class is only used
+ * temporarily to copy the data from the HDFSGatewayEventImpl to the persisted
+ * record in the file.
+ *
+ *
+ */
+public class SortedHDFSQueuePersistedEvent extends SortedHoplogPersistedEvent implements QueuedPersistentEvent {
+
+
+ /**key stored in serialized form*/
+ protected byte[] keyBytes = null;
+
+ public SortedHDFSQueuePersistedEvent(HDFSGatewayEventImpl in) throws IOException,
+ ClassNotFoundException {
+ this(in.getSerializedValue(), in.getOperation(), in.getValueIsObject(), in
+ .getPossibleDuplicate(), in.getVersionTag(), in.getSerializedKey(), in
+ .getCreationTime());
+ }
+
+ public SortedHDFSQueuePersistedEvent(Object valueObject, Operation operation,
+ byte valueIsObject, boolean possibleDuplicate, VersionTag versionTag,
+ byte[] serializedKey, long timestamp) throws ClassNotFoundException, IOException {
+ super(valueObject, operation, valueIsObject, possibleDuplicate, versionTag, timestamp);
+ this.keyBytes = serializedKey;
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ super.toData(out);
+ DataSerializer.writeByteArray(this.keyBytes, out);
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ super.fromData(in);
+ this.keyBytes = DataSerializer.readByteArray(in);
+ }
+
+ @Override
+ public void toHoplogEventBytes(DataOutput out) throws IOException {
+ super.toData(out);
+ }
+
+ public byte[] getRawKey() {
+ return this.keyBytes;
+ }
+ public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
+
+ int size = SortedHoplogPersistedEvent.getSizeInBytes(keySize, valueSize, versionTag);
+
+ size += keySize;
+
+ return size;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java
new file mode 100644
index 0000000..e8be7b8
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.internal.ByteArrayDataInput;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+
+/**
+ * A persistent event that is stored in a sorted hoplog. In addition
+ * to the fields of PersistentEventImpl, this event has a version tag.
+ *
+ * This class should only be serialized by directly calling toData,
+ * which is why it does not implement DataSerializable
+ *
+ */
+public class SortedHoplogPersistedEvent extends PersistedEventImpl {
+ /** version tag for concurrency checks */
+ protected VersionTag versionTag;
+
+ /** timestamp of the event. Used when version checks are disabled*/
+ protected long timestamp;
+
+ public SortedHoplogPersistedEvent(Object valueObject, Operation operation,
+ byte valueIsObject, boolean possibleDuplicate, VersionTag tag, long timestamp) throws ClassNotFoundException, IOException {
+ super(valueObject, operation, valueIsObject, possibleDuplicate, tag != null);
+ this.versionTag = tag;
+ this.timestamp = timestamp;
+ }
+
+ public SortedHoplogPersistedEvent() {
+ //for deserialization
+ }
+
+ @Override
+ public long getTimstamp() {
+ return versionTag == null ? timestamp : versionTag.getVersionTimeStamp();
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ super.toData(out);
+ if (versionTag == null) {
+ out.writeLong(timestamp);
+ } else {
+ //TODO optimize these
+ DataSerializer.writeObject(this.versionTag, out);
+ }
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ super.fromData(in);
+ if (hasVersionTag()) {
+ this.versionTag = (VersionTag)DataSerializer.readObject(in);
+ } else {
+ this.timestamp = in.readLong();
+ }
+ }
+
+ /**
+ * @return the concurrency versioning tag for this event, if any
+ */
+ public VersionTag getVersionTag() {
+ return this.versionTag;
+ }
+
+ public static SortedHoplogPersistedEvent fromBytes(byte[] val)
+ throws IOException, ClassNotFoundException {
+ ByteArrayDataInput in = new ByteArrayDataInput();
+ in.initialize(val, null);
+ SortedHoplogPersistedEvent event = new SortedHoplogPersistedEvent();
+ event.fromData(in);
+ return event;
+ }
+
+ public void copy(PersistedEventImpl usersValue) {
+ super.copy(usersValue);
+ this.versionTag = ((SortedHoplogPersistedEvent) usersValue).versionTag;
+ this.timestamp = ((SortedHoplogPersistedEvent) usersValue).timestamp;
+ }
+
+ public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
+ int size = PersistedEventImpl.getSizeInBytes(keySize, valueSize, versionTag);
+
+ if (versionTag != null) {
+ size += versionTag.getSizeInBytes();
+ } else {
+ // size of Timestamp
+ size += 8;
+ }
+
+ return size;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java
new file mode 100644
index 0000000..93d596b
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+
+
+/**
+ * A persistent event that is stored in the hoplog queue. This class is only used
+ * temporarily to copy the data from the HDFSGatewayEventImpl to the persisted
+ * record in the file.
+ *
+ *
+ */
+public class UnsortedHDFSQueuePersistedEvent extends UnsortedHoplogPersistedEvent implements QueuedPersistentEvent {
+
+ /**the bytes of the key for this entry */
+ protected byte[] keyBytes = null;
+
+ public UnsortedHDFSQueuePersistedEvent(HDFSGatewayEventImpl in) throws IOException,
+ ClassNotFoundException {
+ super(in.getValue(), in.getOperation(), in.getValueIsObject(), in.getPossibleDuplicate(),
+ in.getVersionTimeStamp() == 0 ? in.getCreationTime() : in.getVersionTimeStamp());
+ this.keyBytes = in.getSerializedKey();
+
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ super.toData(out);
+ DataSerializer.writeByteArray(this.keyBytes, out);
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ super.fromData(in);
+ this.keyBytes = DataSerializer.readByteArray(in);
+ }
+
+ @Override
+ public void toHoplogEventBytes(DataOutput out) throws IOException {
+ super.toData(out);
+ }
+
+ public byte[] getRawKey() {
+ return this.keyBytes;
+ }
+
+ public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
+
+ int size = UnsortedHoplogPersistedEvent.getSizeInBytes(keySize, valueSize, versionTag);
+
+ size += keySize;
+
+ return size;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java
new file mode 100644
index 0000000..9b9a04d
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.internal.ByteArrayDataInput;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+
+/**
+ * A persisted event that is sorted in an unsorted (sequential hoplog). This
+ * does not have a version stamp, but just a timestamp for the entry.
+ *
+ * This class should only be serialized by calling toData directly, which
+ * is why it does not implement DataSerializable.
+ *
+ *
+ */
+public class UnsortedHoplogPersistedEvent extends PersistedEventImpl {
+ long timestamp;
+
+
+
+ public UnsortedHoplogPersistedEvent() {
+ //for deserialization
+ }
+
+ public UnsortedHoplogPersistedEvent(Object value, Operation op,
+ byte valueIsObject, boolean isPossibleDuplicate, long timestamp) throws IOException,
+ ClassNotFoundException {
+ super(value, op, valueIsObject, isPossibleDuplicate, false/*hasVersionTag*/);
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public long getTimstamp() {
+ return timestamp;
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ super.toData(out);
+ DataSerializer.writeLong(timestamp, out);
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ super.fromData(in);
+ this.timestamp = DataSerializer.readLong(in);
+ }
+
+ public static UnsortedHoplogPersistedEvent fromBytes(byte[] val)
+ throws IOException, ClassNotFoundException {
+ ByteArrayDataInput in = new ByteArrayDataInput();
+ in.initialize(val, null);
+ UnsortedHoplogPersistedEvent event = new UnsortedHoplogPersistedEvent();
+ event.fromData(in);
+ return event;
+ }
+
+ public void copy(PersistedEventImpl usersValue) {
+ super.copy(usersValue);
+ this.timestamp = ((UnsortedHoplogPersistedEvent) usersValue).timestamp;
+ }
+
+ public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
+ int size = PersistedEventImpl.getSizeInBytes(keySize, valueSize, versionTag);
+
+ // size of Timestamp
+ size += 8;
+
+ return size;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
new file mode 100644
index 0000000..d2fdbe7
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.util.regex.Matcher;
+
+import com.gemstone.gemfire.internal.hll.ICardinality;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile;
+import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.CompressionType;
+import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Abstract class for {@link Hoplog} with common functionality
+ */
+public abstract class AbstractHoplog implements Hoplog {
+ protected final FSProvider fsProvider;
+
+ // path of the oplog file
+ protected volatile Path path;
+ private volatile HoplogDescriptor hfd;
+ protected Configuration conf;
+ protected SortedOplogStatistics stats;
+ protected Long hoplogModificationTime;
+ protected Long hoplogSize;
+
+ protected HoplogReaderActivityListener readerListener;
+
+ // logger instance
+ protected static final Logger logger = LogService.getLogger();
+
+ protected static String logPrefix;
+ // THIS CONSTRUCTOR SHOULD BE USED FOR LONER ONLY
+ AbstractHoplog(FileSystem inputFS, Path filePath, SortedOplogStatistics stats)
+ throws IOException {
+ logPrefix = "<" + filePath.getName() + "> ";
+ this.fsProvider = new FSProvider(inputFS);
+ initialize(filePath, stats, inputFS);
+ }
+
+ public AbstractHoplog(HDFSStoreImpl store, Path filePath,
+ SortedOplogStatistics stats) throws IOException {
+ logPrefix = "<" + filePath.getName() + "> ";
+ this.fsProvider = new FSProvider(store);
+ initialize(filePath, stats, store.getFileSystem());
+ }
+
+ private void initialize(Path path, SortedOplogStatistics stats, FileSystem fs) {
+ this.conf = fs.getConf();
+ this.stats = stats;
+ this.path = fs.makeQualified(path);
+ this.hfd = new HoplogDescriptor(this.path.getName());
+ }
+
+ @Override
+ public abstract void close() throws IOException;
+ @Override
+ public abstract HoplogReader getReader() throws IOException;
+
+ @Override
+ public abstract HoplogWriter createWriter(int keys) throws IOException;
+
+ @Override
+ abstract public void close(boolean clearCache) throws IOException;
+
+ @Override
+ public void setReaderActivityListener(HoplogReaderActivityListener listener) {
+ this.readerListener = listener;
+ }
+
+ @Override
+ public String getFileName() {
+ return this.hfd.getFileName();
+ }
+
+ public final int compareTo(Hoplog o) {
+ return hfd.compareTo( ((AbstractHoplog)o).hfd);
+ }
+
+ @Override
+ public ICardinality getEntryCountEstimate() throws IOException {
+ return null;
+ }
+
+ @Override
+ public synchronized void rename(String name) throws IOException {
+ if (logger.isDebugEnabled())
+ logger.debug("{}Renaming hoplog to " + name, logPrefix);
+ Path parent = path.getParent();
+ Path newPath = new Path(parent, name);
+ fsProvider.getFS().rename(path, new Path(parent, newPath));
+
+ // close the old reader and let the new one get created lazily
+ close();
+
+ // update path to point to the new path
+ path = newPath;
+ this.hfd = new HoplogDescriptor(this.path.getName());
+ logPrefix = "<" + path.getName() + "> ";
+ }
+
+ @Override
+ public synchronized void delete() throws IOException {
+ if (logger.isDebugEnabled())
+ logger.debug("{}Deleting hoplog", logPrefix);
+ close();
+ this.hoplogModificationTime = null;
+ this.hoplogSize = null;
+ fsProvider.getFS().delete(path, false);
+ }
+
+ @Override
+ public long getModificationTimeStamp() {
+ initHoplogSizeTimeInfo();
+
+ // modification time will not be null if this hoplog is existing. Otherwise
+ // invocation of this method should is invalid
+ if (hoplogModificationTime == null) {
+ throw new IllegalStateException();
+ }
+
+ return hoplogModificationTime;
+ }
+
+ @Override
+ public long getSize() {
+ initHoplogSizeTimeInfo();
+
+ // size will not be null if this hoplog is existing. Otherwise
+ // invocation of this method should is invalid
+ if (hoplogSize == null) {
+ throw new IllegalStateException();
+ }
+
+ return hoplogSize;
+ }
+
+ private synchronized void initHoplogSizeTimeInfo() {
+ if (hoplogSize != null && hoplogModificationTime != null) {
+ // time and size info is already initialized. no work needed here
+ return;
+ }
+
+ try {
+ FileStatus[] filesInfo = FSUtils.listStatus(fsProvider.getFS(), path, null);
+ if (filesInfo != null && filesInfo.length == 1) {
+ this.hoplogModificationTime = filesInfo[0].getModificationTime();
+ this.hoplogSize = filesInfo[0].getLen();
+ }
+ // TODO else condition may happen if user deletes hoplog from the file system.
+ } catch (IOException e) {
+ logger.error(LocalizedMessage.create(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE, path), e);
+ throw new HDFSIOException(
+ LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(path),e);
+ }
+ }
+ public static SequenceFile.Writer getSequenceFileWriter(Path path,
+ Configuration conf, Logger logger) throws IOException {
+ return getSequenceFileWriter(path,conf, logger, null);
+ }
+
+ /**
+ *
+ * @param path
+ * @param conf
+ * @param logger
+ * @param version - is being used only for testing. Should be passed as null for other purposes.
+ * @return SequenceFile.Writer
+ * @throws IOException
+ */
+ public static SequenceFile.Writer getSequenceFileWriter(Path path,
+ Configuration conf, Logger logger, Version version) throws IOException {
+ Option optPath = SequenceFile.Writer.file(path);
+ Option optKey = SequenceFile.Writer.keyClass(BytesWritable.class);
+ Option optVal = SequenceFile.Writer.valueClass(BytesWritable.class);
+ Option optCom = withCompression(logger);
+ if (logger.isDebugEnabled())
+ logger.debug("{}Started creating hoplog " + path, logPrefix);
+
+ if (version == null)
+ version = Version.CURRENT;
+ //Create a metadata option with the gemfire version, for future versioning
+ //of the key and value format
+ SequenceFile.Metadata metadata = new SequenceFile.Metadata();
+ metadata.set(new Text(Meta.GEMFIRE_VERSION.name()), new Text(String.valueOf(version.ordinal())));
+ Option optMeta = SequenceFile.Writer.metadata(metadata);
+
+ SequenceFile.Writer writer = SequenceFile.createWriter(conf, optPath, optKey, optVal, optCom, optMeta);
+
+ return writer;
+ }
+
+ private static Option withCompression(Logger logger) {
+ String prop = System.getProperty(HoplogConfig.COMPRESSION);
+ if (prop != null) {
+ CompressionCodec codec;
+ if (prop.equalsIgnoreCase("SNAPPY")) {
+ codec = new SnappyCodec();
+ } else if (prop.equalsIgnoreCase("LZ4")) {
+ codec = new Lz4Codec();
+ } else if (prop.equals("GZ")) {
+ codec = new GzipCodec();
+ } else {
+ throw new IllegalStateException("Unsupported codec: " + prop);
+ }
+ if (logger.isDebugEnabled())
+ logger.debug("{}Using compression codec " + codec, logPrefix);
+ return SequenceFile.Writer.compression(CompressionType.BLOCK, codec);
+ }
+ return SequenceFile.Writer.compression(CompressionType.NONE, null);
+ }
+
+ public static final class HoplogDescriptor implements Comparable<HoplogDescriptor> {
+ private final String fileName;
+ private final String bucket;
+ private final int sequence;
+ private final long timestamp;
+ private final String extension;
+
+ HoplogDescriptor(final String fileName) {
+ this.fileName = fileName;
+ final Matcher matcher = AbstractHoplogOrganizer.HOPLOG_NAME_PATTERN.matcher(fileName);
+ final boolean matched = matcher.find();
+ assert matched;
+ this.bucket = matcher.group(1);
+ this.sequence = Integer.valueOf(matcher.group(3));
+ this.timestamp = Long.valueOf(matcher.group(2));
+ this.extension = matcher.group(4);
+ }
+
+ public final String getFileName() {
+ return fileName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof HoplogDescriptor)) {
+ return false;
+ }
+
+ final HoplogDescriptor other = (HoplogDescriptor)o;
+ // the two files should belong to same bucket
+ assert this.bucket.equals(other.bucket);
+
+ // compare sequence first
+ if (this.sequence != other.sequence) {
+ return false;
+ }
+
+ // sequence is same, compare timestamps
+ if (this.timestamp != other.timestamp) {
+ return false;
+ }
+
+ return extension.equals(other.extension);
+ }
+
+ @Override
+ public int compareTo(HoplogDescriptor o) {
+ if (this == o) {
+ return 0;
+ }
+
+ // the two files should belong to same bucket
+ assert this.bucket.equals(o.bucket);
+
+ // compare sequence first
+ if (sequence > o.sequence) {
+ return -1;
+ } else if (sequence < o.sequence) {
+ return 1;
+ }
+
+ // sequence is same, compare timestamps
+ if(timestamp > o.timestamp) {
+ return -1;
+ } else if (timestamp < o.timestamp) {
+ return 1;
+ }
+
+ //timestamp is the same, compare the file extension. It's
+ //possible a major compaction and minor compaction could finish
+ //at the same time and create the same timestamp and sequence number
+ //it doesn't matter which file we look at first in that case.
+ return extension.compareTo(o.extension);
+ }
+
+
+ }
+
+ protected static final class FSProvider {
+ final FileSystem fs;
+ final HDFSStoreImpl store;
+
+ // THIS METHOD IS FOR TESTING ONLY
+ FSProvider(FileSystem fs) {
+ this.fs = fs;
+ this.store = null;
+ }
+
+ FSProvider(HDFSStoreImpl store) {
+ this.store = store;
+ fs = null;
+ }
+
+ public FileSystem getFS() throws IOException {
+ if (store != null) {
+ return store.getFileSystem();
+ }
+ return fs;
+ }
+
+ public FileSystem checkFileSystem() {
+ store.checkAndClearFileSystem();
+ return store.getCachedFileSystem();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplogOrganizer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplogOrganizer.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplogOrganizer.java
new file mode 100644
index 0000000..4f078d8
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplogOrganizer.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplog.HoplogDescriptor;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import org.apache.logging.log4j.Logger;
+
+
+public abstract class AbstractHoplogOrganizer<T extends PersistedEventImpl> implements HoplogOrganizer<T> {
+
+ public static final String MINOR_HOPLOG_EXTENSION = ".ihop";
+ public static final String MAJOR_HOPLOG_EXTENSION = ".chop";
+ public static final String EXPIRED_HOPLOG_EXTENSION = ".exp";
+ public static final String TEMP_HOPLOG_EXTENSION = ".tmp";
+
+ public static final String FLUSH_HOPLOG_EXTENSION = ".hop";
+ public static final String SEQ_HOPLOG_EXTENSION = ".shop";
+
+ // all valid hoplogs will follow the following name pattern
+ public static final String HOPLOG_NAME_REGEX = "(.+?)-(\\d+?)-(\\d+?)";
+ public static final Pattern HOPLOG_NAME_PATTERN = Pattern.compile(HOPLOG_NAME_REGEX
+ + "\\.(.*)");
+
+ public static boolean JUNIT_TEST_RUN = false;
+
+ protected static final boolean ENABLE_INTEGRITY_CHECKS = Boolean
+ .getBoolean("gemfire.HdfsSortedOplogOrganizer.ENABLE_INTEGRITY_CHECKS")
+ || assertionsEnabled();
+
+ private static boolean assertionsEnabled() {
+ boolean enabled = false;
+ assert enabled = true;
+ return enabled;
+ }
+
+ protected HdfsRegionManager regionManager;
+ // name or id of bucket managed by this organizer
+ protected final String regionFolder;
+ protected final int bucketId;
+
+ // path of the region directory
+ protected final Path basePath;
+ // identifies path of directory containing a bucket's oplog files
+ protected final Path bucketPath;
+
+ protected final HDFSStoreImpl store;
+
+ // assigns a unique increasing number to each oplog file
+ protected AtomicInteger sequence;
+
+ //logger instance
+ protected static final Logger logger = LogService.getLogger();
+ protected final String logPrefix;
+
+ protected SortedOplogStatistics stats;
+ AtomicLong bucketDiskUsage = new AtomicLong(0);
+
+ // creation of new files and expiration of files will be synchronously
+ // notified to the listener
+ protected HoplogListener listener;
+
+ private volatile boolean closed = false;
+
+ protected Object changePrimarylockObject = new Object();
+
+ public AbstractHoplogOrganizer(HdfsRegionManager region, int bucketId) {
+
+ assert region != null;
+
+ this.regionManager = region;
+ this.regionFolder = region.getRegionFolder();
+ this.store = region.getStore();
+ this.listener = region.getListener();
+ this.stats = region.getHdfsStats();
+
+ this.bucketId = bucketId;
+
+ this.basePath = new Path(store.getHomeDir());
+ this.bucketPath = new Path(basePath, regionFolder + "/" + bucketId);
+
+ this.logPrefix = "<" + getRegionBucketStr() + "> ";
+
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed || regionManager.isClosed();
+ }
+
+ @Override
+ public void close() throws IOException {
+ closed = true;
+
+ // this bucket is closed and may be owned by a new node. So reduce the store
+ // usage stat, as the new owner adds the usage metric
+ incrementDiskUsage((-1) * bucketDiskUsage.get());
+ }
+
+ @Override
+ public abstract void flush(Iterator<? extends QueuedPersistentEvent> bufferIter,
+ int count) throws IOException, ForceReattemptException;
+
+ @Override
+ public abstract void clear() throws IOException;
+
+ protected abstract Hoplog getHoplog(Path hoplogPath) throws IOException;
+
+ @Override
+ public void hoplogCreated(String region, int bucketId, Hoplog... oplogs)
+ throws IOException {
+ throw new UnsupportedOperationException("Not supported for "
+ + this.getClass().getSimpleName());
+ }
+
+ @Override
+ public void hoplogDeleted(String region, int bucketId, Hoplog... oplogs)
+ throws IOException {
+ throw new UnsupportedOperationException("Not supported for "
+ + this.getClass().getSimpleName());
+ }
+
+ @Override
+ public void compactionCompleted(String region, int bucket, boolean isMajor) {
+ throw new UnsupportedOperationException("Not supported for "
+ + this.getClass().getSimpleName());
+ }
+
+ @Override
+ public T read(byte[] key) throws IOException {
+ throw new UnsupportedOperationException("Not supported for "
+ + this.getClass().getSimpleName());
+ }
+
+ @Override
+ public HoplogIterator<byte[], T> scan() throws IOException {
+ throw new UnsupportedOperationException("Not supported for "
+ + this.getClass().getSimpleName());
+ }
+
+ @Override
+ public HoplogIterator<byte[], T> scan(byte[] from, byte[] to)
+ throws IOException {
+ throw new UnsupportedOperationException("Not supported for "
+ + this.getClass().getSimpleName());
+ }
+
+ @Override
+ public HoplogIterator<byte[], T> scan(byte[] from,
+ boolean fromInclusive, byte[] to, boolean toInclusive) throws IOException {
+ throw new UnsupportedOperationException("Not supported for "
+ + this.getClass().getSimpleName());
+ }
+
+ @Override
+ public long sizeEstimate() {
+ throw new UnsupportedOperationException("Not supported for "
+ + this.getClass().getSimpleName());
+ }
+
+ /**
+ * @return returns an oplogs full path after prefixing bucket path to the file
+ * name
+ */
+ protected String getPathStr(Hoplog oplog) {
+ return bucketPath.toString() + "/" + oplog.getFileName();
+ }
+
+ protected String getRegionBucketStr() {
+ return regionFolder + "/" + bucketId;
+ }
+
+ protected SortedHoplogPersistedEvent deserializeValue(byte[] val) throws IOException {
+ try {
+ return SortedHoplogPersistedEvent.fromBytes(val);
+ } catch (ClassNotFoundException e) {
+ logger
+ .error(
+ LocalizedStrings.GetMessage_UNABLE_TO_DESERIALIZE_VALUE_CLASSNOTFOUNDEXCEPTION,
+ e);
+ return null;
+ }
+ }
+
+ /**
+ * @return true if the entry belongs to an destroy event
+ */
+ protected boolean isDeletedEntry(byte[] value, int offset) throws IOException {
+ // Read only the first byte of PersistedEventImpl for the operation
+ assert value != null && value.length > 0 && offset >= 0 && offset < value.length;
+ Operation op = Operation.fromOrdinal(value[offset]);
+
+ if (op.isDestroy() || op.isInvalidate()) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * @param seqNum
+ * desired sequence number of the hoplog. If null a highest number is
+ * choosen
+ * @param extension
+ * file extension representing the type of file, e.g. ihop for
+ * intermediate hoplog
+ * @return a new temporary file for a new sorted oplog. The name consists of
+ * bucket name, a sequence number for ordering the files followed by a
+ * timestamp
+ */
+ Hoplog getTmpSortedOplog(Integer seqNum, String extension) throws IOException {
+ if (seqNum == null) {
+ seqNum = sequence.incrementAndGet();
+ }
+ String name = bucketId + "-" + System.currentTimeMillis() + "-" + seqNum
+ + extension;
+ Path soplogPath = new Path(bucketPath, name + TEMP_HOPLOG_EXTENSION);
+ return getHoplog(soplogPath);
+ }
+
+ /**
+ * renames a temporary hoplog file to a legitimate name.
+ */
+ static void makeLegitimate(Hoplog so) throws IOException {
+ String name = so.getFileName();
+ assert name.endsWith(TEMP_HOPLOG_EXTENSION);
+
+ int index = name.lastIndexOf(TEMP_HOPLOG_EXTENSION);
+ name = name.substring(0, index);
+ so.rename(name);
+ }
+
+ /**
+ * creates a expiry marker for a file on file system
+ *
+ * @param hoplog
+ * @throws IOException
+ */
+ protected void addExpiryMarkerForAFile(Hoplog hoplog) throws IOException {
+ FileSystem fs = store.getFileSystem();
+
+ // TODO optimization needed here. instead of creating expired marker
+ // file per file, create a meta file. the main thing to worry is
+ // compaction of meta file itself
+ Path expiryMarker = getExpiryMarkerPath(hoplog.getFileName());
+
+ // uh-oh, why are we trying to expire an already expired file?
+ if (ENABLE_INTEGRITY_CHECKS) {
+ Assert.assertTrue(!fs.exists(expiryMarker),
+ "Expiry marker already exists: " + expiryMarker);
+ }
+
+ FSDataOutputStream expiryMarkerFile = fs.create(expiryMarker);
+ expiryMarkerFile.close();
+
+ if (logger.isDebugEnabled())
+ logger.debug("Hoplog marked expired: " + getPathStr(hoplog));
+ }
+
+ protected Path getExpiryMarkerPath(String name) {
+ return new Path(bucketPath, name + EXPIRED_HOPLOG_EXTENSION);
+ }
+
+ protected String truncateExpiryExtension(String name) {
+ if (name.endsWith(EXPIRED_HOPLOG_EXTENSION)) {
+ return name.substring(0, name.length() - EXPIRED_HOPLOG_EXTENSION.length());
+ }
+
+ return name;
+ }
+
+ /**
+ * updates region stats and a local copy of bucket level store usage metric.
+ *
+ * @param delta
+ */
+ protected void incrementDiskUsage(long delta) {
+ long newSize = bucketDiskUsage.addAndGet(delta);
+ if (newSize < 0 && delta < 0) {
+ if (logger.isDebugEnabled()){
+ logger.debug("{}Invalid diskUsage size:" + newSize + " caused by delta:"
+ + delta + ", parallel del & close?" + isClosed(), logPrefix);
+ }
+ if (isClosed()) {
+ // avoid corrupting disk usage size during close by reducing residue
+ // size only
+ delta = delta + (-1 * newSize);
+ }
+ }
+ stats.incStoreUsageBytes(delta);
+ }
+
+ /**
+ * Utility method to remove a file from valid file list if a expired marker
+ * for the file exists
+ *
+ * @param valid
+ * list of valid files
+ * @param expired
+ * list of expired file markers
+ * @return list f valid files that do not have a expired file marker
+ */
+ public static FileStatus[] filterValidHoplogs(FileStatus[] valid,
+ FileStatus[] expired) {
+ if (valid == null) {
+ return null;
+ }
+
+ if (expired == null) {
+ return valid;
+ }
+
+ ArrayList<FileStatus> result = new ArrayList<FileStatus>();
+ for (FileStatus vs : valid) {
+ boolean found = false;
+ for (FileStatus ex : expired) {
+ if (ex
+ .getPath()
+ .getName()
+ .equals(
+ vs.getPath().getName()
+ + HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION)) {
+ found = true;
+ }
+ }
+ if (!found) {
+ result.add(vs);
+ }
+ }
+
+ return result.toArray(new FileStatus[result.size()]);
+ }
+
+ protected void pingSecondaries() throws ForceReattemptException {
+
+ if (JUNIT_TEST_RUN)
+ return;
+ BucketRegion br = ((PartitionedRegion)this.regionManager.getRegion()).getDataStore().getLocalBucketById(this.bucketId);
+ boolean secondariesPingable = false;
+ try {
+ secondariesPingable = br.areSecondariesPingable();
+ } catch (Throwable e) {
+ throw new ForceReattemptException("Failed to ping secondary servers of bucket: " +
+ this.bucketId + ", region: " + ((PartitionedRegion)this.regionManager.getRegion()), e);
+ }
+ if (!secondariesPingable)
+ throw new ForceReattemptException("Failed to ping secondary servers of bucket: " +
+ this.bucketId + ", region: " + ((PartitionedRegion)this.regionManager.getRegion()));
+ }
+
+
+
+
+ /**
+ * A comparator for ordering soplogs based on the file name. The file names
+ * are assigned incrementally and hint at the age of the file
+ */
+ public static final class HoplogComparator implements
+ Comparator<TrackedReference<Hoplog>> {
+ /**
+ * a file with a higher sequence or timestamp is the younger and hence the
+ * smaller
+ */
+ @Override
+ public int compare(TrackedReference<Hoplog> o1, TrackedReference<Hoplog> o2) {
+ return o1.get().compareTo(o2.get());
+ }
+
+ /**
+ * Compares age of files based on file names and returns 1 if name1 is
+ * older, -1 if name1 is yonger and 0 if the two files are same age
+ */
+ public static int compareByName(String name1, String name2) {
+ HoplogDescriptor hd1 = new HoplogDescriptor(name1);
+ HoplogDescriptor hd2 = new HoplogDescriptor(name2);
+
+ return hd1.compareTo(hd2);
+ }
+ }
+
+ /**
+ * @param matcher
+ * A preinitialized / matched regex pattern
+ * @return Timestamp of the
+ */
+ public static long getHoplogTimestamp(Matcher matcher) {
+ return Long.valueOf(matcher.group(2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BloomFilter.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BloomFilter.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BloomFilter.java
new file mode 100644
index 0000000..86e66a1
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BloomFilter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+public interface BloomFilter {
+ /**
+ * Returns true if the bloom filter might contain the supplied key. The nature of the bloom filter
+ * is such that false positives are allowed, but false negatives cannot occur.
+ */
+ boolean mightContain(byte[] key);
+
+ /**
+ * Returns true if the bloom filter might contain the supplied key. The nature of the bloom filter
+ * is such that false positives are allowed, but false negatives cannot occur.
+ */
+ boolean mightContain(byte[] key, int keyOffset, int keyLength);
+
+ /**
+ * @return Size of the bloom, in bytes
+ */
+ long getBloomSize();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CloseTmpHoplogsTimerTask.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CloseTmpHoplogsTimerTask.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CloseTmpHoplogsTimerTask.java
new file mode 100644
index 0000000..3f67de8
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CloseTmpHoplogsTimerTask.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.util.Collection;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
+import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+import org.apache.logging.log4j.Logger;
+
+/**
+ * For streaming case, if the bucket traffic goes down after writing few batches of data,
+ * the flush doesn't get called. In that case, the file is left in tmp state
+ * until the flush restarts. To avoid this issue, added this timer task
+ * that periodically iterates over the buckets and closes their writer
+ * if the time for rollover has passed.
+ *
+ * It also has got an extra responsibility of fixing the file sizes of the files
+ * that weren't closed properly last time.
+ *
+ *
+ */
+class CloseTmpHoplogsTimerTask extends SystemTimerTask {
+
+ private HdfsRegionManager hdfsRegionManager;
+ private static final Logger logger = LogService.getLogger();
+ private FileSystem filesystem;
+
+ public CloseTmpHoplogsTimerTask(HdfsRegionManager hdfsRegionManager) {
+ this.hdfsRegionManager = hdfsRegionManager;
+
+ // Create a new filesystem
+ // This is added for the following reason:
+ // For HDFS, if a file wasn't closed properly last time,
+ // while calling FileSystem.append for this file, FSNamesystem.startFileInternal->
+ // FSNamesystem.recoverLeaseInternal function gets called.
+ // This function throws AlreadyBeingCreatedException if there is an open handle, to any other file,
+ // created using the same FileSystem object. This is a bug and is being tracked at:
+ // https://issues.apache.org/jira/browse/HDFS-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
+ //
+ // The fix for this bug is not yet part of Pivotal HD. So to overcome the bug,
+ // we create a new file system for the timer task so that it does not encounter the bug.
+ this.filesystem = this.hdfsRegionManager.getStore().createFileSystem();
+ if (logger.isDebugEnabled())
+ logger.debug("created a new file system specifically for timer task");
+ }
+
+
+ /**
+ * Iterates over all the bucket organizers and closes their writer if the time for
+ * rollover has passed. It also has the additional responsibility of fixing the tmp
+ * files that were left over in the last unsuccessful run.
+ */
+ @Override
+ public void run2() {
+ Collection<HoplogOrganizer> organizers = hdfsRegionManager.getBucketOrganizers();
+ if (logger.isDebugEnabled())
+ logger.debug("Starting the close temp logs run.");
+
+ for (HoplogOrganizer organizer: organizers) {
+
+ HDFSUnsortedHoplogOrganizer unsortedOrganizer = (HDFSUnsortedHoplogOrganizer)organizer;
+ long timeSinceLastFlush = (System.currentTimeMillis() - unsortedOrganizer.getLastFlushTime())/1000 ;
+ try {
+ this.hdfsRegionManager.getRegion().checkReadiness();
+ } catch (Exception e) {
+ break;
+ }
+
+ try {
+ // the time since last flush has exceeded file rollover interval, roll over the
+ // file.
+ if (timeSinceLastFlush >= unsortedOrganizer.getfileRolloverInterval()) {
+ if (logger.isDebugEnabled())
+ logger.debug("Closing writer for bucket: " + unsortedOrganizer.bucketId);
+ unsortedOrganizer.synchronizedCloseWriter(false, timeSinceLastFlush, 0);
+ }
+
+ // fix the tmp hoplogs, if any. Pass the new file system here.
+ unsortedOrganizer.identifyAndFixTmpHoplogs(this.filesystem);
+
+ } catch (Exception e) {
+ logger.warn(LocalizedStrings.HOPLOG_CLOSE_FAILED, e);
+ }
+ }
+
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CompactionStatus.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CompactionStatus.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CompactionStatus.java
new file mode 100644
index 0000000..55d8f87
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CompactionStatus.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.internal.VersionedDataSerializable;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * Status of the compaction task reported in the future
+ *
+ */
+public class CompactionStatus implements VersionedDataSerializable {
+ /**MergeGemXDHDFSToGFE check and verify serializationversions **/
+
+ private static Version[] serializationVersions = new Version[]{ Version.GFE_81 };
+ private int bucketId;
+ private boolean status;
+
+ public CompactionStatus() {
+ }
+
+ public CompactionStatus(int bucketId, boolean status) {
+ this.bucketId = bucketId;
+ this.status = status;
+ }
+ public int getBucketId() {
+ return bucketId;
+ }
+ public boolean isStatus() {
+ return status;
+ }
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ out.writeInt(bucketId);
+ out.writeBoolean(status);
+ }
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ this.bucketId = in.readInt();
+ this.status = in.readBoolean();
+ }
+ @Override
+ public Version[] getSerializationVersions() {
+ return serializationVersions;
+ }
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(getClass().getCanonicalName()).append("@")
+ .append(System.identityHashCode(this)).append(" Bucket:")
+ .append(bucketId).append(" status:").append(status);
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/FlushStatus.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/FlushStatus.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/FlushStatus.java
new file mode 100644
index 0000000..84beded
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/FlushStatus.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.internal.VersionedDataSerializable;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * Reports the result of a flush request.
+ *
+ */
+public class FlushStatus implements VersionedDataSerializable {
+ private static Version[] serializationVersions = new Version[]{ Version.GFE_81 };
+ private int bucketId;
+
+ private final static int LAST = -1;
+
+ public FlushStatus() {
+ }
+
+ public static FlushStatus last() {
+ return new FlushStatus(LAST);
+ }
+
+ public FlushStatus(int bucketId) {
+ this.bucketId = bucketId;
+ }
+ public int getBucketId() {
+ return bucketId;
+ }
+ public boolean isLast() {
+ return bucketId == LAST;
+ }
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ out.writeInt(bucketId);
+ }
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ this.bucketId = in.readInt();
+ }
+ @Override
+ public Version[] getSerializationVersions() {
+ return serializationVersions;
+ }
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(getClass().getCanonicalName()).append("@")
+ .append(System.identityHashCode(this)).append(" Bucket:")
+ .append(bucketId);
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java
new file mode 100644
index 0000000..ba191c2
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer.Compactor;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+/**
+ * A singleton which schedules compaction of hoplogs owned by this node as primary and manages
+ * executor of ongoing compactions. Ideally the number of pending request will not exceed the number
+ * of buckets in the node as hoplog organizer avoids creating a new request if compaction on the
+ * bucket is active. Moreover separate queues for major and minor compactions are maintained to
+ * prevent long running major compactions from preventing minor compactions.
+ */
+public class HDFSCompactionManager {
+ /*
+ * Each hdfs store has its own concurrency configuration. Concurrency
+ * configuration is used by compaction manager to manage threads. This member
+ * holds hdsf-store to compaction manager mapping
+ */
+ private static final ConcurrentHashMap<String, HDFSCompactionManager> storeToManagerMap =
+ new ConcurrentHashMap<String, HDFSCompactionManager>();
+
+ // hdfs store configuration used to initialize this instance
+ HDFSStore storeConfig;
+
+ // Executor for ordered execution of minor compaction requests.
+ private final CompactionExecutor minorCompactor;
+ // Executor for ordered execution of major compaction requests.
+ private final CompactionExecutor majorCompactor;
+
+ private static final Logger logger = LogService.getLogger();
+ protected final static String logPrefix = "<" + "HDFSCompactionManager" + "> ";;
+
+ private HDFSCompactionManager(HDFSStore config) {
+ this.storeConfig = config;
+ // configure hdfs compaction manager
+ int capacity = Integer.getInteger(HoplogConfig.COMPCATION_QUEUE_CAPACITY,
+ HoplogConfig.COMPCATION_QUEUE_CAPACITY_DEFAULT);
+
+ minorCompactor = new CompactionExecutor(config.getMinorCompactionThreads(), capacity, "MinorCompactor_"
+ + config.getName());
+
+ majorCompactor = new CompactionExecutor(config.getMajorCompactionThreads(), capacity, "MajorCompactor_"
+ + config.getName());
+
+ minorCompactor.allowCoreThreadTimeOut(true);
+ majorCompactor.allowCoreThreadTimeOut(true);
+ }
+
+ public static synchronized HDFSCompactionManager getInstance(HDFSStore config) {
+ HDFSCompactionManager instance = storeToManagerMap.get(config.getName());
+ if (instance == null) {
+ instance = new HDFSCompactionManager(config);
+ storeToManagerMap.put(config.getName(), instance);
+ }
+
+ return instance;
+ }
+
+ /**
+ * Accepts compaction request for asynchronous compaction execution.
+ *
+ * @param request
+ * compaction request with region and bucket id
+ * @return true if the request is accepted, false if the compactor is overlaoded and there is a
+ * long wait queue
+ */
+ public synchronized Future<CompactionStatus> submitRequest(CompactionRequest request) {
+ if (!request.isForced && request.compactor.isBusy(request.isMajor)) {
+ if (logger.isDebugEnabled()) {
+ fineLog("Compactor is busy. Ignoring ", request);
+ }
+ return null;
+ }
+
+ CompactionExecutor executor = request.isMajor ? majorCompactor : minorCompactor;
+
+ try {
+ return executor.submit(request);
+ } catch (Throwable e) {
+ if (e instanceof CompactionIsDisabled) {
+ if (logger.isDebugEnabled()) {
+ fineLog("{}" +e.getMessage(), logPrefix);
+ }
+ } else {
+ logger.info(LocalizedMessage.create(LocalizedStrings.ONE_ARG, "Compaction request submission failed"), e);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Removes all pending compaction requests. Programmed for TESTING ONLY
+ */
+ public void reset() {
+ minorCompactor.shutdownNow();
+ majorCompactor.shutdownNow();
+ HDFSCompactionManager.storeToManagerMap.remove(storeConfig.getName());
+ }
+
+ /**
+ * Returns minor compactor. Programmed for TESTING AND MONITORING ONLY
+ */
+ public ThreadPoolExecutor getMinorCompactor() {
+ return minorCompactor;
+ }
+
+ /**
+ * Returns major compactor. Programmed for TESTING AND MONITORING ONLY
+ */
+ public ThreadPoolExecutor getMajorCompactor() {
+ return majorCompactor;
+ }
+
+ /**
+ * Contains important details needed for executing a compaction cycle.
+ */
+ public static class CompactionRequest implements Callable<CompactionStatus> {
+ String regionFolder;
+ int bucket;
+ Compactor compactor;
+ boolean isMajor;
+ final boolean isForced;
+ final boolean versionUpgrade;
+
+ public CompactionRequest(String regionFolder, int bucket, Compactor compactor, boolean major) {
+ this(regionFolder, bucket, compactor, major, false);
+ }
+
+ public CompactionRequest(String regionFolder, int bucket, Compactor compactor, boolean major, boolean isForced) {
+ this(regionFolder, bucket, compactor, major, isForced, false);
+ }
+
+ public CompactionRequest(String regionFolder, int bucket, Compactor compactor, boolean major, boolean isForced, boolean versionUpgrade) {
+ this.regionFolder = regionFolder;
+ this.bucket = bucket;
+ this.compactor = compactor;
+ this.isMajor = major;
+ this.isForced = isForced;
+ this.versionUpgrade = versionUpgrade;
+ }
+
+ @Override
+ public CompactionStatus call() throws Exception {
+ HDFSStore store = compactor.getHdfsStore();
+ if (!isForced) {
+ // this is a auto generated compaction request. If auto compaction is
+ // disabled, ignore this call.
+ if (isMajor && !store.getMajorCompaction()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Major compaction is disabled. Ignoring request",logPrefix);
+ }
+ return new CompactionStatus(bucket, false);
+ } else if (!isMajor && !store.getMinorCompaction()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Minor compaction is disabled. Ignoring request", logPrefix);
+ }
+ return new CompactionStatus(bucket, false);
+ }
+ }
+
+ // all hurdles passed, execute compaction now
+ try {
+ boolean status = compactor.compact(isMajor, versionUpgrade);
+ return new CompactionStatus(bucket, status);
+ } catch (IOException e) {
+ logger.error(LocalizedMessage.create(LocalizedStrings.HOPLOG_HDFS_COMPACTION_ERROR, bucket), e);
+ }
+ return new CompactionStatus(bucket, false);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + bucket;
+ result = prime * result
+ + ((regionFolder == null) ? 0 : regionFolder.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ CompactionRequest other = (CompactionRequest) obj;
+ if (bucket != other.bucket)
+ return false;
+ if (regionFolder == null) {
+ if (other.regionFolder != null)
+ return false;
+ } else if (!regionFolder.equals(other.regionFolder))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "CompactionRequest [regionFolder=" + regionFolder + ", bucket="
+ + bucket + ", isMajor=" + isMajor + ", isForced="+isForced+"]";
+ }
+ }
+
+ /**
+ * Helper class for creating named instances of comapction threads and managing compaction
+ * executor. All threads wait infinitely
+ */
+ private class CompactionExecutor extends ThreadPoolExecutor implements ThreadFactory {
+ final AtomicInteger count = new AtomicInteger(1);
+ private String name;
+
+ CompactionExecutor(int max, int capacity, String name) {
+ super(max, max, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(capacity));
+ allowCoreThreadTimeOut(true);
+ setThreadFactory(this);
+ this.name = name;
+ }
+
+ private void throwIfStopped(CompactionRequest req, HDFSStore storeConfig) {
+ // check if compaction is enabled everytime. Alter may change this value
+ // so this check is needed everytime
+ boolean isEnabled = true;
+ isEnabled = storeConfig.getMinorCompaction();
+ if (req.isMajor) {
+ isEnabled = storeConfig.getMajorCompaction();
+ }
+ if (isEnabled || req.isForced) {
+ return;
+ }
+ throw new CompactionIsDisabled(name + " is disabled");
+ }
+
+ private void throwIfPoolSizeChanged(CompactionRequest task, HDFSStore config) {
+ int threadCount = config.getMinorCompactionThreads();
+ if (task.isMajor) {
+ threadCount = config.getMajorCompactionThreads();
+ }
+
+ if (getCorePoolSize() < threadCount) {
+ setCorePoolSize(threadCount);
+ } else if (getCorePoolSize() > threadCount) {
+ setCorePoolSize(threadCount);
+ }
+
+ if (!task.isForced && getActiveCount() > threadCount) {
+ // the number is active threads is more than new max pool size. Throw
+ // error is this is system generated compaction request
+ throw new CompactionIsDisabled(
+ "Rejecting to reduce the number of threads for " + name
+ + ", currently:" + getActiveCount() + " target:"
+ + threadCount);
+ }
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ throwIfStopped((CompactionRequest) task, HDFSCompactionManager.this.storeConfig);
+ throwIfPoolSizeChanged((CompactionRequest) task, HDFSCompactionManager.this.storeConfig);
+
+ if (logger.isDebugEnabled()) {
+ fineLog("New:", task, " pool:", getPoolSize(), " active:", getActiveCount());
+ }
+ return super.submit(task);
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r, name + ":" + count.getAndIncrement());
+ thread.setDaemon(true);
+ if (logger.isDebugEnabled()) {
+ fineLog("New thread:", name, " poolSize:", getPoolSize(),
+ " active:", getActiveCount());
+ }
+ return thread;
+ }
+ }
+
+ public static class CompactionIsDisabled extends RejectedExecutionException {
+ private static final long serialVersionUID = 1L;
+ public CompactionIsDisabled(String name) {
+ super(name);
+ }
+ }
+
+
+ private void fineLog(Object... strings) {
+ if (logger.isDebugEnabled()) {
+ StringBuffer sb = new StringBuffer();
+ for (Object str : strings) {
+ sb.append(str.toString());
+ }
+ logger.debug("{}"+sb.toString(), logPrefix);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueArgs.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueArgs.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueArgs.java
new file mode 100644
index 0000000..36e171b
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueArgs.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.internal.VersionedDataSerializable;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * Defines the arguments to the flush queue request.
+ *
+ */
+@SuppressWarnings("serial")
+public class HDFSFlushQueueArgs implements VersionedDataSerializable {
+
+ private static Version[] serializationVersions = new Version[]{ Version.GFE_81 };
+
+ private HashSet<Integer> buckets;
+
+ private long maxWaitTimeMillis;
+
+ public HDFSFlushQueueArgs() {
+ }
+
+ public HDFSFlushQueueArgs(Set<Integer> buckets, long maxWaitTime) {
+ this.buckets = new HashSet<Integer>(buckets);
+ this.maxWaitTimeMillis = maxWaitTime;
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ DataSerializer.writeHashSet(buckets, out);
+ out.writeLong(maxWaitTimeMillis);
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException,
+ ClassNotFoundException {
+ this.buckets = DataSerializer.readHashSet(in);
+ this.maxWaitTimeMillis = in.readLong();
+ }
+
+ @Override
+ public Version[] getSerializationVersions() {
+ return serializationVersions;
+ }
+
+ public Set<Integer> getBuckets() {
+ return (Set<Integer>) buckets;
+ }
+
+ public void setBuckets(Set<Integer> buckets) {
+ this.buckets = new HashSet<Integer>(buckets);
+ }
+
+ public boolean isSynchronous() {
+ return maxWaitTimeMillis == 0;
+ }
+
+ public long getMaxWaitTime() {
+ return this.maxWaitTimeMillis;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(getClass().getCanonicalName()).append("@")
+ .append(System.identityHashCode(this))
+ .append(" buckets:").append(buckets)
+ .append(" maxWaitTime:").append(maxWaitTimeMillis);
+ return sb.toString();
+ }
+}