You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2016/04/27 23:16:49 UTC
[17/22] incubator-geode git commit: GEODE-1072: Removing HDFS related
code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java
deleted file mode 100644
index 82e2bf9..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.cache.Operation;
-import com.gemstone.gemfire.internal.DataSerializableFixedID;
-import com.gemstone.gemfire.internal.cache.CachedDeserializable;
-import com.gemstone.gemfire.internal.cache.CachedDeserializableFactory;
-import com.gemstone.gemfire.internal.cache.EntryEventImpl;
-import com.gemstone.gemfire.internal.cache.lru.Sizeable;
-import com.gemstone.gemfire.internal.cache.versions.VersionTag;
-import com.gemstone.gemfire.internal.Version;
-
-/**
- * Event that is persisted in HDFS. As we need to persist some of the EntryEventImpl
- * variables, we have created this class and have overridden toData and fromData functions.
- *
- * There are subclasses of this class of the different types of persisted events
- * sorted vs. unsorted, and the persisted events we keep in the region
- * queue, which need to hold the region key.
- *
- *
- */
-public abstract class PersistedEventImpl {
- protected Operation op = Operation.UPDATE;
-
- protected Object valueObject;
-
- /**
- * A field with flags decribing the event
- */
- protected byte flags;
-
- //FLags indicating the type of value
- //if the value is not a byte array or object, is is an internal delta.
- private static final byte VALUE_IS_BYTE_ARRAY= 0x01;
- private static final byte VALUE_IS_OBJECT= (VALUE_IS_BYTE_ARRAY << 1);
- private static final byte POSSIBLE_DUPLICATE = (VALUE_IS_OBJECT << 1);
- private static final byte HAS_VERSION_TAG = (POSSIBLE_DUPLICATE << 1);
-
-
- /** for deserialization */
- public PersistedEventImpl() {
- }
-
- public PersistedEventImpl(Object value, Operation op, byte valueIsObject,
- boolean isPossibleDuplicate, boolean hasVersionTag) throws IOException,
- ClassNotFoundException {
- this.op = op;
- this.valueObject = value;
- setFlag(VALUE_IS_BYTE_ARRAY, valueIsObject == 0x00);
- setFlag(VALUE_IS_OBJECT, valueIsObject == 0x01);
- setFlag(POSSIBLE_DUPLICATE, isPossibleDuplicate);
- setFlag(HAS_VERSION_TAG, hasVersionTag);
- }
-
- private void setFlag(byte flag, boolean set) {
- flags = (byte) (set ? flags | flag : flags & ~flag);
- }
-
- private boolean getFlag(byte flag) {
- return (flags & flag) != 0x0;
- }
-
- public void toData(DataOutput out) throws IOException {
- out.writeByte(this.op.ordinal);
- out.writeByte(this.flags);
-
- if (getFlag(VALUE_IS_BYTE_ARRAY)) {
- DataSerializer.writeByteArray((byte[])this.valueObject, out);
- } else if (getFlag(VALUE_IS_OBJECT)) {
- if(valueObject instanceof CachedDeserializable) {
- CachedDeserializable cd = (CachedDeserializable)valueObject;
- DataSerializer.writeObjectAsByteArray(cd.getValue(), out);
- } else {
- DataSerializer.writeObjectAsByteArray(valueObject, out);
- }
- }
- else {
- DataSerializer.writeObject(valueObject, out);
- }
- }
-
- public void fromData(DataInput in) throws IOException, ClassNotFoundException {
- this.op = Operation.fromOrdinal(in.readByte());
- this.flags = in.readByte();
-
- if (getFlag(VALUE_IS_BYTE_ARRAY)) {
- this.valueObject = DataSerializer.readByteArray(in);
- } else if (getFlag(VALUE_IS_OBJECT)) {
- byte[] newValueBytes = DataSerializer.readByteArray(in);
- if(newValueBytes == null) {
- this.valueObject = null;
- } else {
- if(CachedDeserializableFactory.preferObject()) {
- this.valueObject = EntryEventImpl.deserialize(newValueBytes);
- } else {
- this.valueObject = CachedDeserializableFactory.create(newValueBytes);
- }
- }
- }
- else {
- this.valueObject = DataSerializer.readObject(in);
- }
-
- }
-
- /**
- * Return the timestamp of this event. Depending on the subclass,
- * this may be part of the version tag, or a separate field.
- */
- public abstract long getTimstamp();
-
- protected boolean hasVersionTag() {
- return getFlag(HAS_VERSION_TAG);
- }
-
- public Operation getOperation()
- {
- return this.op;
- }
-
- public Object getValue() {
- return this.valueObject;
- }
-
- public boolean isPossibleDuplicate()
- {
- return getFlag(POSSIBLE_DUPLICATE);
- }
-
- /**
- * returns deserialized value.
- *
- */
- public Object getDeserializedValue() throws IOException, ClassNotFoundException {
- Object retVal = null;
- if (getFlag(VALUE_IS_BYTE_ARRAY)) {
- // value is a byte array
- retVal = this.valueObject;
- } else if (getFlag(VALUE_IS_OBJECT)) {
- if(valueObject instanceof CachedDeserializable) {
- retVal = ((CachedDeserializable)valueObject).getDeserializedForReading();
- } else {
- retVal = valueObject;
- }
- }
- else {
- // value is a object
- retVal = this.valueObject;
- }
- return retVal;
- }
-
- @Override
- public String toString() {
- StringBuilder str = new StringBuilder(PersistedEventImpl.class.getSimpleName());
- str.append("@").append(System.identityHashCode(this))
- .append(" op:").append(op)
- .append(" valueObject:").append(valueObject)
- .append(" isPossibleDuplicate:").append(getFlag(POSSIBLE_DUPLICATE));
- return str.toString();
- }
-
- public void copy(PersistedEventImpl usersValue) {
- this.op = usersValue.op;
- this.valueObject = usersValue.valueObject;
- this.flags = usersValue.flags;
- }
-
- public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
- int size = 0;
-
- // value length
- size += valueSize;
-
- // one byte for op and one byte for flag
- size += 2;
-
- return size;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java
deleted file mode 100644
index bd7994c..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-public interface QueuedPersistentEvent {
-
- public byte[] getRawKey();
-
- public void toHoplogEventBytes(DataOutput out) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java
deleted file mode 100644
index b97bdb7..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Tracks flushes using a queue of latches.
- *
- */
-public class SignalledFlushObserver implements FlushObserver {
- private static class FlushLatch extends CountDownLatch {
- private final long seqnum;
-
- public FlushLatch(long seqnum) {
- super(1);
- this.seqnum = seqnum;
- }
-
- public long getSequence() {
- return seqnum;
- }
- }
-
- // assume the number of outstanding flush requests is small so we don't
- // need to organize by seqnum
- private final List<FlushLatch> signals;
-
- private final AtomicLong eventsReceived;
- private final AtomicLong eventsDelivered;
-
- public SignalledFlushObserver() {
- signals = new ArrayList<FlushLatch>();
- eventsReceived = new AtomicLong(0);
- eventsDelivered = new AtomicLong(0);
- }
-
- @Override
- public boolean shouldDrainImmediately() {
- synchronized (signals) {
- return !signals.isEmpty();
- }
- }
-
- @Override
- public AsyncFlushResult flush() {
- final long seqnum = eventsReceived.get();
- synchronized (signals) {
- final FlushLatch flush;
- if (seqnum <= eventsDelivered.get()) {
- flush = null;
- } else {
- flush = new FlushLatch(seqnum);
- signals.add(flush);
- }
-
- return new AsyncFlushResult() {
- @Override
- public boolean waitForFlush(long timeout, TimeUnit unit) throws InterruptedException {
- return flush == null ? true : flush.await(timeout, unit);
- }
- };
- }
- }
-
- /**
- * Invoked when an event is received.
- */
- public void push() {
- eventsReceived.incrementAndGet();
- }
-
- /**
- * Invoked when a batch has been dispatched.
- */
- public void pop(int count) {
- long highmark = eventsDelivered.addAndGet(count);
- synchronized (signals) {
- for (ListIterator<FlushLatch> iter = signals.listIterator(); iter.hasNext(); ) {
- FlushLatch flush = iter.next();
- if (flush.getSequence() <= highmark) {
- flush.countDown();
- iter.remove();
- }
- }
- }
- }
-
- /**
- * Invoked when the queue is cleared.
- */
- public void clear() {
- synchronized (signals) {
- for (FlushLatch flush : signals) {
- flush.countDown();
- }
-
- signals.clear();
- eventsReceived.set(0);
- eventsDelivered.set(0);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java
deleted file mode 100644
index c725ce5..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.cache.Operation;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.internal.DataSerializableFixedID;
-import com.gemstone.gemfire.internal.cache.versions.VersionTag;
-import com.gemstone.gemfire.internal.Version;
-
-/**
- * A persistent event that is stored in the hoplog queue. This class is only used
- * temporarily to copy the data from the HDFSGatewayEventImpl to the persisted
- * record in the file.
- *
- *
- */
-public class SortedHDFSQueuePersistedEvent extends SortedHoplogPersistedEvent implements QueuedPersistentEvent {
-
-
- /**key stored in serialized form*/
- protected byte[] keyBytes = null;
-
- public SortedHDFSQueuePersistedEvent(HDFSGatewayEventImpl in) throws IOException,
- ClassNotFoundException {
- this(in.getSerializedValue(), in.getOperation(), in.getValueIsObject(), in
- .getPossibleDuplicate(), in.getVersionTag(), in.getSerializedKey(), in
- .getCreationTime());
- }
-
- public SortedHDFSQueuePersistedEvent(Object valueObject, Operation operation,
- byte valueIsObject, boolean possibleDuplicate, VersionTag versionTag,
- byte[] serializedKey, long timestamp) throws ClassNotFoundException, IOException {
- super(valueObject, operation, valueIsObject, possibleDuplicate, versionTag, timestamp);
- this.keyBytes = serializedKey;
- // TODO Auto-generated constructor stub
- }
-
- @Override
- public void toData(DataOutput out) throws IOException {
- super.toData(out);
- DataSerializer.writeByteArray(this.keyBytes, out);
- }
-
- @Override
- public void fromData(DataInput in) throws IOException, ClassNotFoundException {
- super.fromData(in);
- this.keyBytes = DataSerializer.readByteArray(in);
- }
-
- @Override
- public void toHoplogEventBytes(DataOutput out) throws IOException {
- super.toData(out);
- }
-
- public byte[] getRawKey() {
- return this.keyBytes;
- }
- public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
-
- int size = SortedHoplogPersistedEvent.getSizeInBytes(keySize, valueSize, versionTag);
-
- size += keySize;
-
- return size;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java
deleted file mode 100644
index e8be7b8..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.cache.Operation;
-import com.gemstone.gemfire.internal.ByteArrayDataInput;
-import com.gemstone.gemfire.internal.cache.versions.VersionTag;
-
-/**
- * A persistent event that is stored in a sorted hoplog. In addition
- * to the fields of PersistentEventImpl, this event has a version tag.
- *
- * This class should only be serialized by directly calling toData,
- * which is why it does not implement DataSerializable
- *
- */
-public class SortedHoplogPersistedEvent extends PersistedEventImpl {
- /** version tag for concurrency checks */
- protected VersionTag versionTag;
-
- /** timestamp of the event. Used when version checks are disabled*/
- protected long timestamp;
-
- public SortedHoplogPersistedEvent(Object valueObject, Operation operation,
- byte valueIsObject, boolean possibleDuplicate, VersionTag tag, long timestamp) throws ClassNotFoundException, IOException {
- super(valueObject, operation, valueIsObject, possibleDuplicate, tag != null);
- this.versionTag = tag;
- this.timestamp = timestamp;
- }
-
- public SortedHoplogPersistedEvent() {
- //for deserialization
- }
-
- @Override
- public long getTimstamp() {
- return versionTag == null ? timestamp : versionTag.getVersionTimeStamp();
- }
-
- @Override
- public void toData(DataOutput out) throws IOException {
- super.toData(out);
- if (versionTag == null) {
- out.writeLong(timestamp);
- } else {
- //TODO optimize these
- DataSerializer.writeObject(this.versionTag, out);
- }
- }
-
- @Override
- public void fromData(DataInput in) throws IOException, ClassNotFoundException {
- super.fromData(in);
- if (hasVersionTag()) {
- this.versionTag = (VersionTag)DataSerializer.readObject(in);
- } else {
- this.timestamp = in.readLong();
- }
- }
-
- /**
- * @return the concurrency versioning tag for this event, if any
- */
- public VersionTag getVersionTag() {
- return this.versionTag;
- }
-
- public static SortedHoplogPersistedEvent fromBytes(byte[] val)
- throws IOException, ClassNotFoundException {
- ByteArrayDataInput in = new ByteArrayDataInput();
- in.initialize(val, null);
- SortedHoplogPersistedEvent event = new SortedHoplogPersistedEvent();
- event.fromData(in);
- return event;
- }
-
- public void copy(PersistedEventImpl usersValue) {
- super.copy(usersValue);
- this.versionTag = ((SortedHoplogPersistedEvent) usersValue).versionTag;
- this.timestamp = ((SortedHoplogPersistedEvent) usersValue).timestamp;
- }
-
- public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
- int size = PersistedEventImpl.getSizeInBytes(keySize, valueSize, versionTag);
-
- if (versionTag != null) {
- size += versionTag.getSizeInBytes();
- } else {
- // size of Timestamp
- size += 8;
- }
-
- return size;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java
deleted file mode 100644
index 93d596b..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.internal.cache.versions.VersionTag;
-
-
-/**
- * A persistent event that is stored in the hoplog queue. This class is only used
- * temporarily to copy the data from the HDFSGatewayEventImpl to the persisted
- * record in the file.
- *
- *
- */
-public class UnsortedHDFSQueuePersistedEvent extends UnsortedHoplogPersistedEvent implements QueuedPersistentEvent {
-
- /**the bytes of the key for this entry */
- protected byte[] keyBytes = null;
-
- public UnsortedHDFSQueuePersistedEvent(HDFSGatewayEventImpl in) throws IOException,
- ClassNotFoundException {
- super(in.getValue(), in.getOperation(), in.getValueIsObject(), in.getPossibleDuplicate(),
- in.getVersionTimeStamp() == 0 ? in.getCreationTime() : in.getVersionTimeStamp());
- this.keyBytes = in.getSerializedKey();
-
- }
-
- @Override
- public void toData(DataOutput out) throws IOException {
- super.toData(out);
- DataSerializer.writeByteArray(this.keyBytes, out);
- }
-
- @Override
- public void fromData(DataInput in) throws IOException, ClassNotFoundException {
- super.fromData(in);
- this.keyBytes = DataSerializer.readByteArray(in);
- }
-
- @Override
- public void toHoplogEventBytes(DataOutput out) throws IOException {
- super.toData(out);
- }
-
- public byte[] getRawKey() {
- return this.keyBytes;
- }
-
- public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
-
- int size = UnsortedHoplogPersistedEvent.getSizeInBytes(keySize, valueSize, versionTag);
-
- size += keySize;
-
- return size;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java
deleted file mode 100644
index 9b9a04d..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.cache.Operation;
-import com.gemstone.gemfire.internal.ByteArrayDataInput;
-import com.gemstone.gemfire.internal.cache.versions.VersionTag;
-
-/**
- * A persisted event that is sorted in an unsorted (sequential hoplog). This
- * does not have a version stamp, but just a timestamp for the entry.
- *
- * This class should only be serialized by calling toData directly, which
- * is why it does not implement DataSerializable.
- *
- *
- */
-public class UnsortedHoplogPersistedEvent extends PersistedEventImpl {
- long timestamp;
-
-
-
- public UnsortedHoplogPersistedEvent() {
- //for deserialization
- }
-
- public UnsortedHoplogPersistedEvent(Object value, Operation op,
- byte valueIsObject, boolean isPossibleDuplicate, long timestamp) throws IOException,
- ClassNotFoundException {
- super(value, op, valueIsObject, isPossibleDuplicate, false/*hasVersionTag*/);
- this.timestamp = timestamp;
- }
-
- @Override
- public long getTimstamp() {
- return timestamp;
- }
-
- @Override
- public void toData(DataOutput out) throws IOException {
- super.toData(out);
- DataSerializer.writeLong(timestamp, out);
- }
-
- @Override
- public void fromData(DataInput in) throws IOException, ClassNotFoundException {
- super.fromData(in);
- this.timestamp = DataSerializer.readLong(in);
- }
-
- public static UnsortedHoplogPersistedEvent fromBytes(byte[] val)
- throws IOException, ClassNotFoundException {
- ByteArrayDataInput in = new ByteArrayDataInput();
- in.initialize(val, null);
- UnsortedHoplogPersistedEvent event = new UnsortedHoplogPersistedEvent();
- event.fromData(in);
- return event;
- }
-
- public void copy(PersistedEventImpl usersValue) {
- super.copy(usersValue);
- this.timestamp = ((UnsortedHoplogPersistedEvent) usersValue).timestamp;
- }
-
- public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
- int size = PersistedEventImpl.getSizeInBytes(keySize, valueSize, versionTag);
-
- // size of Timestamp
- size += 8;
-
- return size;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
deleted file mode 100644
index d2fdbe7..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.IOException;
-import java.util.regex.Matcher;
-
-import com.gemstone.gemfire.internal.hll.ICardinality;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.io.compress.Lz4Codec;
-import org.apache.hadoop.io.compress.SnappyCodec;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile;
-import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.CompressionType;
-import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.Version;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-import org.apache.hadoop.hbase.util.FSUtils;
-
-import org.apache.logging.log4j.Logger;
-
-/**
- * Abstract class for {@link Hoplog} with common functionality
- */
-public abstract class AbstractHoplog implements Hoplog {
- protected final FSProvider fsProvider;
-
- // path of the oplog file
- protected volatile Path path;
- private volatile HoplogDescriptor hfd;
- protected Configuration conf;
- protected SortedOplogStatistics stats;
- protected Long hoplogModificationTime;
- protected Long hoplogSize;
-
- protected HoplogReaderActivityListener readerListener;
-
- // logger instance
- protected static final Logger logger = LogService.getLogger();
-
- protected static String logPrefix;
- // THIS CONSTRUCTOR SHOULD BE USED FOR LONER ONLY
- AbstractHoplog(FileSystem inputFS, Path filePath, SortedOplogStatistics stats)
- throws IOException {
- logPrefix = "<" + filePath.getName() + "> ";
- this.fsProvider = new FSProvider(inputFS);
- initialize(filePath, stats, inputFS);
- }
-
- public AbstractHoplog(HDFSStoreImpl store, Path filePath,
- SortedOplogStatistics stats) throws IOException {
- logPrefix = "<" + filePath.getName() + "> ";
- this.fsProvider = new FSProvider(store);
- initialize(filePath, stats, store.getFileSystem());
- }
-
- private void initialize(Path path, SortedOplogStatistics stats, FileSystem fs) {
- this.conf = fs.getConf();
- this.stats = stats;
- this.path = fs.makeQualified(path);
- this.hfd = new HoplogDescriptor(this.path.getName());
- }
-
- @Override
- public abstract void close() throws IOException;
- @Override
- public abstract HoplogReader getReader() throws IOException;
-
- @Override
- public abstract HoplogWriter createWriter(int keys) throws IOException;
-
- @Override
- abstract public void close(boolean clearCache) throws IOException;
-
- @Override
- public void setReaderActivityListener(HoplogReaderActivityListener listener) {
- this.readerListener = listener;
- }
-
- @Override
- public String getFileName() {
- return this.hfd.getFileName();
- }
-
- public final int compareTo(Hoplog o) {
- return hfd.compareTo( ((AbstractHoplog)o).hfd);
- }
-
- @Override
- public ICardinality getEntryCountEstimate() throws IOException {
- return null;
- }
-
- @Override
- public synchronized void rename(String name) throws IOException {
- if (logger.isDebugEnabled())
- logger.debug("{}Renaming hoplog to " + name, logPrefix);
- Path parent = path.getParent();
- Path newPath = new Path(parent, name);
- fsProvider.getFS().rename(path, new Path(parent, newPath));
-
- // close the old reader and let the new one get created lazily
- close();
-
- // update path to point to the new path
- path = newPath;
- this.hfd = new HoplogDescriptor(this.path.getName());
- logPrefix = "<" + path.getName() + "> ";
- }
-
- @Override
- public synchronized void delete() throws IOException {
- if (logger.isDebugEnabled())
- logger.debug("{}Deleting hoplog", logPrefix);
- close();
- this.hoplogModificationTime = null;
- this.hoplogSize = null;
- fsProvider.getFS().delete(path, false);
- }
-
- @Override
- public long getModificationTimeStamp() {
- initHoplogSizeTimeInfo();
-
- // modification time will not be null if this hoplog is existing. Otherwise
- // invocation of this method should is invalid
- if (hoplogModificationTime == null) {
- throw new IllegalStateException();
- }
-
- return hoplogModificationTime;
- }
-
- @Override
- public long getSize() {
- initHoplogSizeTimeInfo();
-
- // size will not be null if this hoplog is existing. Otherwise
- // invocation of this method should is invalid
- if (hoplogSize == null) {
- throw new IllegalStateException();
- }
-
- return hoplogSize;
- }
-
- private synchronized void initHoplogSizeTimeInfo() {
- if (hoplogSize != null && hoplogModificationTime != null) {
- // time and size info is already initialized. no work needed here
- return;
- }
-
- try {
- FileStatus[] filesInfo = FSUtils.listStatus(fsProvider.getFS(), path, null);
- if (filesInfo != null && filesInfo.length == 1) {
- this.hoplogModificationTime = filesInfo[0].getModificationTime();
- this.hoplogSize = filesInfo[0].getLen();
- }
- // TODO else condition may happen if user deletes hoplog from the file system.
- } catch (IOException e) {
- logger.error(LocalizedMessage.create(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE, path), e);
- throw new HDFSIOException(
- LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(path),e);
- }
- }
- public static SequenceFile.Writer getSequenceFileWriter(Path path,
- Configuration conf, Logger logger) throws IOException {
- return getSequenceFileWriter(path,conf, logger, null);
- }
-
- /**
- *
- * @param path
- * @param conf
- * @param logger
- * @param version - is being used only for testing. Should be passed as null for other purposes.
- * @return SequenceFile.Writer
- * @throws IOException
- */
- public static SequenceFile.Writer getSequenceFileWriter(Path path,
- Configuration conf, Logger logger, Version version) throws IOException {
- Option optPath = SequenceFile.Writer.file(path);
- Option optKey = SequenceFile.Writer.keyClass(BytesWritable.class);
- Option optVal = SequenceFile.Writer.valueClass(BytesWritable.class);
- Option optCom = withCompression(logger);
- if (logger.isDebugEnabled())
- logger.debug("{}Started creating hoplog " + path, logPrefix);
-
- if (version == null)
- version = Version.CURRENT;
- //Create a metadata option with the gemfire version, for future versioning
- //of the key and value format
- SequenceFile.Metadata metadata = new SequenceFile.Metadata();
- metadata.set(new Text(Meta.GEMFIRE_VERSION.name()), new Text(String.valueOf(version.ordinal())));
- Option optMeta = SequenceFile.Writer.metadata(metadata);
-
- SequenceFile.Writer writer = SequenceFile.createWriter(conf, optPath, optKey, optVal, optCom, optMeta);
-
- return writer;
- }
-
- private static Option withCompression(Logger logger) {
- String prop = System.getProperty(HoplogConfig.COMPRESSION);
- if (prop != null) {
- CompressionCodec codec;
- if (prop.equalsIgnoreCase("SNAPPY")) {
- codec = new SnappyCodec();
- } else if (prop.equalsIgnoreCase("LZ4")) {
- codec = new Lz4Codec();
- } else if (prop.equals("GZ")) {
- codec = new GzipCodec();
- } else {
- throw new IllegalStateException("Unsupported codec: " + prop);
- }
- if (logger.isDebugEnabled())
- logger.debug("{}Using compression codec " + codec, logPrefix);
- return SequenceFile.Writer.compression(CompressionType.BLOCK, codec);
- }
- return SequenceFile.Writer.compression(CompressionType.NONE, null);
- }
-
- public static final class HoplogDescriptor implements Comparable<HoplogDescriptor> {
- private final String fileName;
- private final String bucket;
- private final int sequence;
- private final long timestamp;
- private final String extension;
-
- HoplogDescriptor(final String fileName) {
- this.fileName = fileName;
- final Matcher matcher = AbstractHoplogOrganizer.HOPLOG_NAME_PATTERN.matcher(fileName);
- final boolean matched = matcher.find();
- assert matched;
- this.bucket = matcher.group(1);
- this.sequence = Integer.valueOf(matcher.group(3));
- this.timestamp = Long.valueOf(matcher.group(2));
- this.extension = matcher.group(4);
- }
-
- public final String getFileName() {
- return fileName;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (!(o instanceof HoplogDescriptor)) {
- return false;
- }
-
- final HoplogDescriptor other = (HoplogDescriptor)o;
- // the two files should belong to same bucket
- assert this.bucket.equals(other.bucket);
-
- // compare sequence first
- if (this.sequence != other.sequence) {
- return false;
- }
-
- // sequence is same, compare timestamps
- if (this.timestamp != other.timestamp) {
- return false;
- }
-
- return extension.equals(other.extension);
- }
-
- @Override
- public int compareTo(HoplogDescriptor o) {
- if (this == o) {
- return 0;
- }
-
- // the two files should belong to same bucket
- assert this.bucket.equals(o.bucket);
-
- // compare sequence first
- if (sequence > o.sequence) {
- return -1;
- } else if (sequence < o.sequence) {
- return 1;
- }
-
- // sequence is same, compare timestamps
- if(timestamp > o.timestamp) {
- return -1;
- } else if (timestamp < o.timestamp) {
- return 1;
- }
-
- //timestamp is the same, compare the file extension. It's
- //possible a major compaction and minor compaction could finish
- //at the same time and create the same timestamp and sequence number
- //it doesn't matter which file we look at first in that case.
- return extension.compareTo(o.extension);
- }
-
-
- }
-
- protected static final class FSProvider {
- final FileSystem fs;
- final HDFSStoreImpl store;
-
- // THIS METHOD IS FOR TESTING ONLY
- FSProvider(FileSystem fs) {
- this.fs = fs;
- this.store = null;
- }
-
- FSProvider(HDFSStoreImpl store) {
- this.store = store;
- fs = null;
- }
-
- public FileSystem getFS() throws IOException {
- if (store != null) {
- return store.getFileSystem();
- }
- return fs;
- }
-
- public FileSystem checkFileSystem() {
- store.checkAndClearFileSystem();
- return store.getCachedFileSystem();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplogOrganizer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplogOrganizer.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplogOrganizer.java
deleted file mode 100644
index 4f078d8..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplogOrganizer.java
+++ /dev/null
@@ -1,430 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.gemstone.gemfire.cache.Operation;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
-import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplog.HoplogDescriptor;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
-import com.gemstone.gemfire.internal.Assert;
-import com.gemstone.gemfire.internal.cache.BucketRegion;
-import com.gemstone.gemfire.internal.cache.ForceReattemptException;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-import org.apache.logging.log4j.Logger;
-
-
-public abstract class AbstractHoplogOrganizer<T extends PersistedEventImpl> implements HoplogOrganizer<T> {
-
- public static final String MINOR_HOPLOG_EXTENSION = ".ihop";
- public static final String MAJOR_HOPLOG_EXTENSION = ".chop";
- public static final String EXPIRED_HOPLOG_EXTENSION = ".exp";
- public static final String TEMP_HOPLOG_EXTENSION = ".tmp";
-
- public static final String FLUSH_HOPLOG_EXTENSION = ".hop";
- public static final String SEQ_HOPLOG_EXTENSION = ".shop";
-
- // all valid hoplogs will follow the following name pattern
- public static final String HOPLOG_NAME_REGEX = "(.+?)-(\\d+?)-(\\d+?)";
- public static final Pattern HOPLOG_NAME_PATTERN = Pattern.compile(HOPLOG_NAME_REGEX
- + "\\.(.*)");
-
- public static boolean JUNIT_TEST_RUN = false;
-
- protected static final boolean ENABLE_INTEGRITY_CHECKS = Boolean
- .getBoolean("gemfire.HdfsSortedOplogOrganizer.ENABLE_INTEGRITY_CHECKS")
- || assertionsEnabled();
-
- private static boolean assertionsEnabled() {
- boolean enabled = false;
- assert enabled = true;
- return enabled;
- }
-
- protected HdfsRegionManager regionManager;
- // name or id of bucket managed by this organizer
- protected final String regionFolder;
- protected final int bucketId;
-
- // path of the region directory
- protected final Path basePath;
- // identifies path of directory containing a bucket's oplog files
- protected final Path bucketPath;
-
- protected final HDFSStoreImpl store;
-
- // assigns a unique increasing number to each oplog file
- protected AtomicInteger sequence;
-
- //logger instance
- protected static final Logger logger = LogService.getLogger();
- protected final String logPrefix;
-
- protected SortedOplogStatistics stats;
- AtomicLong bucketDiskUsage = new AtomicLong(0);
-
- // creation of new files and expiration of files will be synchronously
- // notified to the listener
- protected HoplogListener listener;
-
- private volatile boolean closed = false;
-
- protected Object changePrimarylockObject = new Object();
-
- public AbstractHoplogOrganizer(HdfsRegionManager region, int bucketId) {
-
- assert region != null;
-
- this.regionManager = region;
- this.regionFolder = region.getRegionFolder();
- this.store = region.getStore();
- this.listener = region.getListener();
- this.stats = region.getHdfsStats();
-
- this.bucketId = bucketId;
-
- this.basePath = new Path(store.getHomeDir());
- this.bucketPath = new Path(basePath, regionFolder + "/" + bucketId);
-
- this.logPrefix = "<" + getRegionBucketStr() + "> ";
-
- }
-
- @Override
- public boolean isClosed() {
- return closed || regionManager.isClosed();
- }
-
- @Override
- public void close() throws IOException {
- closed = true;
-
- // this bucket is closed and may be owned by a new node. So reduce the store
- // usage stat, as the new owner adds the usage metric
- incrementDiskUsage((-1) * bucketDiskUsage.get());
- }
-
- @Override
- public abstract void flush(Iterator<? extends QueuedPersistentEvent> bufferIter,
- int count) throws IOException, ForceReattemptException;
-
- @Override
- public abstract void clear() throws IOException;
-
- protected abstract Hoplog getHoplog(Path hoplogPath) throws IOException;
-
- @Override
- public void hoplogCreated(String region, int bucketId, Hoplog... oplogs)
- throws IOException {
- throw new UnsupportedOperationException("Not supported for "
- + this.getClass().getSimpleName());
- }
-
- @Override
- public void hoplogDeleted(String region, int bucketId, Hoplog... oplogs)
- throws IOException {
- throw new UnsupportedOperationException("Not supported for "
- + this.getClass().getSimpleName());
- }
-
- @Override
- public void compactionCompleted(String region, int bucket, boolean isMajor) {
- throw new UnsupportedOperationException("Not supported for "
- + this.getClass().getSimpleName());
- }
-
- @Override
- public T read(byte[] key) throws IOException {
- throw new UnsupportedOperationException("Not supported for "
- + this.getClass().getSimpleName());
- }
-
- @Override
- public HoplogIterator<byte[], T> scan() throws IOException {
- throw new UnsupportedOperationException("Not supported for "
- + this.getClass().getSimpleName());
- }
-
- @Override
- public HoplogIterator<byte[], T> scan(byte[] from, byte[] to)
- throws IOException {
- throw new UnsupportedOperationException("Not supported for "
- + this.getClass().getSimpleName());
- }
-
- @Override
- public HoplogIterator<byte[], T> scan(byte[] from,
- boolean fromInclusive, byte[] to, boolean toInclusive) throws IOException {
- throw new UnsupportedOperationException("Not supported for "
- + this.getClass().getSimpleName());
- }
-
- @Override
- public long sizeEstimate() {
- throw new UnsupportedOperationException("Not supported for "
- + this.getClass().getSimpleName());
- }
-
- /**
- * @return returns an oplogs full path after prefixing bucket path to the file
- * name
- */
- protected String getPathStr(Hoplog oplog) {
- return bucketPath.toString() + "/" + oplog.getFileName();
- }
-
- protected String getRegionBucketStr() {
- return regionFolder + "/" + bucketId;
- }
-
- protected SortedHoplogPersistedEvent deserializeValue(byte[] val) throws IOException {
- try {
- return SortedHoplogPersistedEvent.fromBytes(val);
- } catch (ClassNotFoundException e) {
- logger
- .error(
- LocalizedStrings.GetMessage_UNABLE_TO_DESERIALIZE_VALUE_CLASSNOTFOUNDEXCEPTION,
- e);
- return null;
- }
- }
-
- /**
- * @return true if the entry belongs to an destroy event
- */
- protected boolean isDeletedEntry(byte[] value, int offset) throws IOException {
- // Read only the first byte of PersistedEventImpl for the operation
- assert value != null && value.length > 0 && offset >= 0 && offset < value.length;
- Operation op = Operation.fromOrdinal(value[offset]);
-
- if (op.isDestroy() || op.isInvalidate()) {
- return true;
- }
- return false;
- }
-
- /**
- * @param seqNum
- * desired sequence number of the hoplog. If null a highest number is
- * choosen
- * @param extension
- * file extension representing the type of file, e.g. ihop for
- * intermediate hoplog
- * @return a new temporary file for a new sorted oplog. The name consists of
- * bucket name, a sequence number for ordering the files followed by a
- * timestamp
- */
- Hoplog getTmpSortedOplog(Integer seqNum, String extension) throws IOException {
- if (seqNum == null) {
- seqNum = sequence.incrementAndGet();
- }
- String name = bucketId + "-" + System.currentTimeMillis() + "-" + seqNum
- + extension;
- Path soplogPath = new Path(bucketPath, name + TEMP_HOPLOG_EXTENSION);
- return getHoplog(soplogPath);
- }
-
- /**
- * renames a temporary hoplog file to a legitimate name.
- */
- static void makeLegitimate(Hoplog so) throws IOException {
- String name = so.getFileName();
- assert name.endsWith(TEMP_HOPLOG_EXTENSION);
-
- int index = name.lastIndexOf(TEMP_HOPLOG_EXTENSION);
- name = name.substring(0, index);
- so.rename(name);
- }
-
- /**
- * creates a expiry marker for a file on file system
- *
- * @param hoplog
- * @throws IOException
- */
- protected void addExpiryMarkerForAFile(Hoplog hoplog) throws IOException {
- FileSystem fs = store.getFileSystem();
-
- // TODO optimization needed here. instead of creating expired marker
- // file per file, create a meta file. the main thing to worry is
- // compaction of meta file itself
- Path expiryMarker = getExpiryMarkerPath(hoplog.getFileName());
-
- // uh-oh, why are we trying to expire an already expired file?
- if (ENABLE_INTEGRITY_CHECKS) {
- Assert.assertTrue(!fs.exists(expiryMarker),
- "Expiry marker already exists: " + expiryMarker);
- }
-
- FSDataOutputStream expiryMarkerFile = fs.create(expiryMarker);
- expiryMarkerFile.close();
-
- if (logger.isDebugEnabled())
- logger.debug("Hoplog marked expired: " + getPathStr(hoplog));
- }
-
- protected Path getExpiryMarkerPath(String name) {
- return new Path(bucketPath, name + EXPIRED_HOPLOG_EXTENSION);
- }
-
- protected String truncateExpiryExtension(String name) {
- if (name.endsWith(EXPIRED_HOPLOG_EXTENSION)) {
- return name.substring(0, name.length() - EXPIRED_HOPLOG_EXTENSION.length());
- }
-
- return name;
- }
-
- /**
- * updates region stats and a local copy of bucket level store usage metric.
- *
- * @param delta
- */
- protected void incrementDiskUsage(long delta) {
- long newSize = bucketDiskUsage.addAndGet(delta);
- if (newSize < 0 && delta < 0) {
- if (logger.isDebugEnabled()){
- logger.debug("{}Invalid diskUsage size:" + newSize + " caused by delta:"
- + delta + ", parallel del & close?" + isClosed(), logPrefix);
- }
- if (isClosed()) {
- // avoid corrupting disk usage size during close by reducing residue
- // size only
- delta = delta + (-1 * newSize);
- }
- }
- stats.incStoreUsageBytes(delta);
- }
-
- /**
- * Utility method to remove a file from valid file list if a expired marker
- * for the file exists
- *
- * @param valid
- * list of valid files
- * @param expired
- * list of expired file markers
- * @return list f valid files that do not have a expired file marker
- */
- public static FileStatus[] filterValidHoplogs(FileStatus[] valid,
- FileStatus[] expired) {
- if (valid == null) {
- return null;
- }
-
- if (expired == null) {
- return valid;
- }
-
- ArrayList<FileStatus> result = new ArrayList<FileStatus>();
- for (FileStatus vs : valid) {
- boolean found = false;
- for (FileStatus ex : expired) {
- if (ex
- .getPath()
- .getName()
- .equals(
- vs.getPath().getName()
- + HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION)) {
- found = true;
- }
- }
- if (!found) {
- result.add(vs);
- }
- }
-
- return result.toArray(new FileStatus[result.size()]);
- }
-
- protected void pingSecondaries() throws ForceReattemptException {
-
- if (JUNIT_TEST_RUN)
- return;
- BucketRegion br = ((PartitionedRegion)this.regionManager.getRegion()).getDataStore().getLocalBucketById(this.bucketId);
- boolean secondariesPingable = false;
- try {
- secondariesPingable = br.areSecondariesPingable();
- } catch (Throwable e) {
- throw new ForceReattemptException("Failed to ping secondary servers of bucket: " +
- this.bucketId + ", region: " + ((PartitionedRegion)this.regionManager.getRegion()), e);
- }
- if (!secondariesPingable)
- throw new ForceReattemptException("Failed to ping secondary servers of bucket: " +
- this.bucketId + ", region: " + ((PartitionedRegion)this.regionManager.getRegion()));
- }
-
-
-
-
- /**
- * A comparator for ordering soplogs based on the file name. The file names
- * are assigned incrementally and hint at the age of the file
- */
- public static final class HoplogComparator implements
- Comparator<TrackedReference<Hoplog>> {
- /**
- * a file with a higher sequence or timestamp is the younger and hence the
- * smaller
- */
- @Override
- public int compare(TrackedReference<Hoplog> o1, TrackedReference<Hoplog> o2) {
- return o1.get().compareTo(o2.get());
- }
-
- /**
- * Compares age of files based on file names and returns 1 if name1 is
- * older, -1 if name1 is yonger and 0 if the two files are same age
- */
- public static int compareByName(String name1, String name2) {
- HoplogDescriptor hd1 = new HoplogDescriptor(name1);
- HoplogDescriptor hd2 = new HoplogDescriptor(name2);
-
- return hd1.compareTo(hd2);
- }
- }
-
- /**
- * @param matcher
- * A preinitialized / matched regex pattern
- * @return Timestamp of the
- */
- public static long getHoplogTimestamp(Matcher matcher) {
- return Long.valueOf(matcher.group(2));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BloomFilter.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BloomFilter.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BloomFilter.java
deleted file mode 100644
index 86e66a1..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BloomFilter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-public interface BloomFilter {
- /**
- * Returns true if the bloom filter might contain the supplied key. The nature of the bloom filter
- * is such that false positives are allowed, but false negatives cannot occur.
- */
- boolean mightContain(byte[] key);
-
- /**
- * Returns true if the bloom filter might contain the supplied key. The nature of the bloom filter
- * is such that false positives are allowed, but false negatives cannot occur.
- */
- boolean mightContain(byte[] key, int keyOffset, int keyLength);
-
- /**
- * @return Size of the bloom, in bytes
- */
- long getBloomSize();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CloseTmpHoplogsTimerTask.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CloseTmpHoplogsTimerTask.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CloseTmpHoplogsTimerTask.java
deleted file mode 100644
index 3f67de8..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CloseTmpHoplogsTimerTask.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.util.Collection;
-
-import org.apache.hadoop.fs.FileSystem;
-
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
-import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-import org.apache.logging.log4j.Logger;
-
-/**
- * For streaming case, if the bucket traffic goes down after writing few batches of data,
- * the flush doesn't get called. In that case, the file is left in tmp state
- * until the flush restarts. To avoid this issue, added this timer task
- * that periodically iterates over the buckets and closes their writer
- * if the time for rollover has passed.
- *
- * It also has got an extra responsibility of fixing the file sizes of the files
- * that weren't closed properly last time.
- *
- *
- */
-class CloseTmpHoplogsTimerTask extends SystemTimerTask {
-
- private HdfsRegionManager hdfsRegionManager;
- private static final Logger logger = LogService.getLogger();
- private FileSystem filesystem;
-
- public CloseTmpHoplogsTimerTask(HdfsRegionManager hdfsRegionManager) {
- this.hdfsRegionManager = hdfsRegionManager;
-
- // Create a new filesystem
- // This is added for the following reason:
- // For HDFS, if a file wasn't closed properly last time,
- // while calling FileSystem.append for this file, FSNamesystem.startFileInternal->
- // FSNamesystem.recoverLeaseInternal function gets called.
- // This function throws AlreadyBeingCreatedException if there is an open handle, to any other file,
- // created using the same FileSystem object. This is a bug and is being tracked at:
- // https://issues.apache.org/jira/browse/HDFS-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
- //
- // The fix for this bug is not yet part of Pivotal HD. So to overcome the bug,
- // we create a new file system for the timer task so that it does not encounter the bug.
- this.filesystem = this.hdfsRegionManager.getStore().createFileSystem();
- if (logger.isDebugEnabled())
- logger.debug("created a new file system specifically for timer task");
- }
-
-
- /**
- * Iterates over all the bucket organizers and closes their writer if the time for
- * rollover has passed. It also has the additional responsibility of fixing the tmp
- * files that were left over in the last unsuccessful run.
- */
- @Override
- public void run2() {
- Collection<HoplogOrganizer> organizers = hdfsRegionManager.getBucketOrganizers();
- if (logger.isDebugEnabled())
- logger.debug("Starting the close temp logs run.");
-
- for (HoplogOrganizer organizer: organizers) {
-
- HDFSUnsortedHoplogOrganizer unsortedOrganizer = (HDFSUnsortedHoplogOrganizer)organizer;
- long timeSinceLastFlush = (System.currentTimeMillis() - unsortedOrganizer.getLastFlushTime())/1000 ;
- try {
- this.hdfsRegionManager.getRegion().checkReadiness();
- } catch (Exception e) {
- break;
- }
-
- try {
- // the time since last flush has exceeded file rollover interval, roll over the
- // file.
- if (timeSinceLastFlush >= unsortedOrganizer.getfileRolloverInterval()) {
- if (logger.isDebugEnabled())
- logger.debug("Closing writer for bucket: " + unsortedOrganizer.bucketId);
- unsortedOrganizer.synchronizedCloseWriter(false, timeSinceLastFlush, 0);
- }
-
- // fix the tmp hoplogs, if any. Pass the new file system here.
- unsortedOrganizer.identifyAndFixTmpHoplogs(this.filesystem);
-
- } catch (Exception e) {
- logger.warn(LocalizedStrings.HOPLOG_CLOSE_FAILED, e);
- }
- }
-
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CompactionStatus.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CompactionStatus.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CompactionStatus.java
deleted file mode 100644
index 55d8f87..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CompactionStatus.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import com.gemstone.gemfire.internal.VersionedDataSerializable;
-import com.gemstone.gemfire.internal.Version;
-
-/**
- * Status of the compaction task reported in the future
- *
- */
-public class CompactionStatus implements VersionedDataSerializable {
- /**MergeGemXDHDFSToGFE check and verify serializationversions **/
-
- private static Version[] serializationVersions = new Version[]{ Version.GFE_81 };
- private int bucketId;
- private boolean status;
-
- public CompactionStatus() {
- }
-
- public CompactionStatus(int bucketId, boolean status) {
- this.bucketId = bucketId;
- this.status = status;
- }
- public int getBucketId() {
- return bucketId;
- }
- public boolean isStatus() {
- return status;
- }
- @Override
- public void toData(DataOutput out) throws IOException {
- out.writeInt(bucketId);
- out.writeBoolean(status);
- }
- @Override
- public void fromData(DataInput in) throws IOException, ClassNotFoundException {
- this.bucketId = in.readInt();
- this.status = in.readBoolean();
- }
- @Override
- public Version[] getSerializationVersions() {
- return serializationVersions;
- }
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(getClass().getCanonicalName()).append("@")
- .append(System.identityHashCode(this)).append(" Bucket:")
- .append(bucketId).append(" status:").append(status);
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/FlushStatus.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/FlushStatus.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/FlushStatus.java
deleted file mode 100644
index 84beded..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/FlushStatus.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import com.gemstone.gemfire.internal.VersionedDataSerializable;
-import com.gemstone.gemfire.internal.Version;
-
-/**
- * Reports the result of a flush request.
- *
- */
-public class FlushStatus implements VersionedDataSerializable {
- private static Version[] serializationVersions = new Version[]{ Version.GFE_81 };
- private int bucketId;
-
- private final static int LAST = -1;
-
- public FlushStatus() {
- }
-
- public static FlushStatus last() {
- return new FlushStatus(LAST);
- }
-
- public FlushStatus(int bucketId) {
- this.bucketId = bucketId;
- }
- public int getBucketId() {
- return bucketId;
- }
- public boolean isLast() {
- return bucketId == LAST;
- }
- @Override
- public void toData(DataOutput out) throws IOException {
- out.writeInt(bucketId);
- }
- @Override
- public void fromData(DataInput in) throws IOException, ClassNotFoundException {
- this.bucketId = in.readInt();
- }
- @Override
- public Version[] getSerializationVersions() {
- return serializationVersions;
- }
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(getClass().getCanonicalName()).append("@")
- .append(System.identityHashCode(this)).append(" Bucket:")
- .append(bucketId);
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java
deleted file mode 100644
index ba191c2..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer.Compactor;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-
-/**
- * A singleton which schedules compaction of hoplogs owned by this node as primary and manages
- * executor of ongoing compactions. Ideally the number of pending request will not exceed the number
- * of buckets in the node as hoplog organizer avoids creating a new request if compaction on the
- * bucket is active. Moreover separate queues for major and minor compactions are maintained to
- * prevent long running major compactions from preventing minor compactions.
- */
-public class HDFSCompactionManager {
- /*
- * Each hdfs store has its own concurrency configuration. Concurrency
- * configuration is used by compaction manager to manage threads. This member
- * holds hdsf-store to compaction manager mapping
- */
- private static final ConcurrentHashMap<String, HDFSCompactionManager> storeToManagerMap =
- new ConcurrentHashMap<String, HDFSCompactionManager>();
-
- // hdfs store configuration used to initialize this instance
- HDFSStore storeConfig;
-
- // Executor for ordered execution of minor compaction requests.
- private final CompactionExecutor minorCompactor;
- // Executor for ordered execution of major compaction requests.
- private final CompactionExecutor majorCompactor;
-
- private static final Logger logger = LogService.getLogger();
- protected final static String logPrefix = "<" + "HDFSCompactionManager" + "> ";;
-
- private HDFSCompactionManager(HDFSStore config) {
- this.storeConfig = config;
- // configure hdfs compaction manager
- int capacity = Integer.getInteger(HoplogConfig.COMPCATION_QUEUE_CAPACITY,
- HoplogConfig.COMPCATION_QUEUE_CAPACITY_DEFAULT);
-
- minorCompactor = new CompactionExecutor(config.getMinorCompactionThreads(), capacity, "MinorCompactor_"
- + config.getName());
-
- majorCompactor = new CompactionExecutor(config.getMajorCompactionThreads(), capacity, "MajorCompactor_"
- + config.getName());
-
- minorCompactor.allowCoreThreadTimeOut(true);
- majorCompactor.allowCoreThreadTimeOut(true);
- }
-
- public static synchronized HDFSCompactionManager getInstance(HDFSStore config) {
- HDFSCompactionManager instance = storeToManagerMap.get(config.getName());
- if (instance == null) {
- instance = new HDFSCompactionManager(config);
- storeToManagerMap.put(config.getName(), instance);
- }
-
- return instance;
- }
-
- /**
- * Accepts compaction request for asynchronous compaction execution.
- *
- * @param request
- * compaction request with region and bucket id
- * @return true if the request is accepted, false if the compactor is overlaoded and there is a
- * long wait queue
- */
- public synchronized Future<CompactionStatus> submitRequest(CompactionRequest request) {
- if (!request.isForced && request.compactor.isBusy(request.isMajor)) {
- if (logger.isDebugEnabled()) {
- fineLog("Compactor is busy. Ignoring ", request);
- }
- return null;
- }
-
- CompactionExecutor executor = request.isMajor ? majorCompactor : minorCompactor;
-
- try {
- return executor.submit(request);
- } catch (Throwable e) {
- if (e instanceof CompactionIsDisabled) {
- if (logger.isDebugEnabled()) {
- fineLog("{}" +e.getMessage(), logPrefix);
- }
- } else {
- logger.info(LocalizedMessage.create(LocalizedStrings.ONE_ARG, "Compaction request submission failed"), e);
- }
- }
- return null;
- }
-
- /**
- * Removes all pending compaction requests. Programmed for TESTING ONLY
- */
- public void reset() {
- minorCompactor.shutdownNow();
- majorCompactor.shutdownNow();
- HDFSCompactionManager.storeToManagerMap.remove(storeConfig.getName());
- }
-
- /**
- * Returns minor compactor. Programmed for TESTING AND MONITORING ONLY
- */
- public ThreadPoolExecutor getMinorCompactor() {
- return minorCompactor;
- }
-
- /**
- * Returns major compactor. Programmed for TESTING AND MONITORING ONLY
- */
- public ThreadPoolExecutor getMajorCompactor() {
- return majorCompactor;
- }
-
- /**
- * Contains important details needed for executing a compaction cycle.
- */
- public static class CompactionRequest implements Callable<CompactionStatus> {
- String regionFolder;
- int bucket;
- Compactor compactor;
- boolean isMajor;
- final boolean isForced;
- final boolean versionUpgrade;
-
- public CompactionRequest(String regionFolder, int bucket, Compactor compactor, boolean major) {
- this(regionFolder, bucket, compactor, major, false);
- }
-
- public CompactionRequest(String regionFolder, int bucket, Compactor compactor, boolean major, boolean isForced) {
- this(regionFolder, bucket, compactor, major, isForced, false);
- }
-
- public CompactionRequest(String regionFolder, int bucket, Compactor compactor, boolean major, boolean isForced, boolean versionUpgrade) {
- this.regionFolder = regionFolder;
- this.bucket = bucket;
- this.compactor = compactor;
- this.isMajor = major;
- this.isForced = isForced;
- this.versionUpgrade = versionUpgrade;
- }
-
- @Override
- public CompactionStatus call() throws Exception {
- HDFSStore store = compactor.getHdfsStore();
- if (!isForced) {
- // this is a auto generated compaction request. If auto compaction is
- // disabled, ignore this call.
- if (isMajor && !store.getMajorCompaction()) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Major compaction is disabled. Ignoring request",logPrefix);
- }
- return new CompactionStatus(bucket, false);
- } else if (!isMajor && !store.getMinorCompaction()) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Minor compaction is disabled. Ignoring request", logPrefix);
- }
- return new CompactionStatus(bucket, false);
- }
- }
-
- // all hurdles passed, execute compaction now
- try {
- boolean status = compactor.compact(isMajor, versionUpgrade);
- return new CompactionStatus(bucket, status);
- } catch (IOException e) {
- logger.error(LocalizedMessage.create(LocalizedStrings.HOPLOG_HDFS_COMPACTION_ERROR, bucket), e);
- }
- return new CompactionStatus(bucket, false);
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + bucket;
- result = prime * result
- + ((regionFolder == null) ? 0 : regionFolder.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- CompactionRequest other = (CompactionRequest) obj;
- if (bucket != other.bucket)
- return false;
- if (regionFolder == null) {
- if (other.regionFolder != null)
- return false;
- } else if (!regionFolder.equals(other.regionFolder))
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- return "CompactionRequest [regionFolder=" + regionFolder + ", bucket="
- + bucket + ", isMajor=" + isMajor + ", isForced="+isForced+"]";
- }
- }
-
- /**
- * Helper class for creating named instances of comapction threads and managing compaction
- * executor. All threads wait infinitely
- */
- private class CompactionExecutor extends ThreadPoolExecutor implements ThreadFactory {
- final AtomicInteger count = new AtomicInteger(1);
- private String name;
-
- CompactionExecutor(int max, int capacity, String name) {
- super(max, max, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(capacity));
- allowCoreThreadTimeOut(true);
- setThreadFactory(this);
- this.name = name;
- }
-
- private void throwIfStopped(CompactionRequest req, HDFSStore storeConfig) {
- // check if compaction is enabled everytime. Alter may change this value
- // so this check is needed everytime
- boolean isEnabled = true;
- isEnabled = storeConfig.getMinorCompaction();
- if (req.isMajor) {
- isEnabled = storeConfig.getMajorCompaction();
- }
- if (isEnabled || req.isForced) {
- return;
- }
- throw new CompactionIsDisabled(name + " is disabled");
- }
-
- private void throwIfPoolSizeChanged(CompactionRequest task, HDFSStore config) {
- int threadCount = config.getMinorCompactionThreads();
- if (task.isMajor) {
- threadCount = config.getMajorCompactionThreads();
- }
-
- if (getCorePoolSize() < threadCount) {
- setCorePoolSize(threadCount);
- } else if (getCorePoolSize() > threadCount) {
- setCorePoolSize(threadCount);
- }
-
- if (!task.isForced && getActiveCount() > threadCount) {
- // the number is active threads is more than new max pool size. Throw
- // error is this is system generated compaction request
- throw new CompactionIsDisabled(
- "Rejecting to reduce the number of threads for " + name
- + ", currently:" + getActiveCount() + " target:"
- + threadCount);
- }
- }
-
- @Override
- public <T> Future<T> submit(Callable<T> task) {
- throwIfStopped((CompactionRequest) task, HDFSCompactionManager.this.storeConfig);
- throwIfPoolSizeChanged((CompactionRequest) task, HDFSCompactionManager.this.storeConfig);
-
- if (logger.isDebugEnabled()) {
- fineLog("New:", task, " pool:", getPoolSize(), " active:", getActiveCount());
- }
- return super.submit(task);
- }
-
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r, name + ":" + count.getAndIncrement());
- thread.setDaemon(true);
- if (logger.isDebugEnabled()) {
- fineLog("New thread:", name, " poolSize:", getPoolSize(),
- " active:", getActiveCount());
- }
- return thread;
- }
- }
-
- public static class CompactionIsDisabled extends RejectedExecutionException {
- private static final long serialVersionUID = 1L;
- public CompactionIsDisabled(String name) {
- super(name);
- }
- }
-
-
- private void fineLog(Object... strings) {
- if (logger.isDebugEnabled()) {
- StringBuffer sb = new StringBuffer();
- for (Object str : strings) {
- sb.append(str.toString());
- }
- logger.debug("{}"+sb.toString(), logPrefix);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueArgs.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueArgs.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueArgs.java
deleted file mode 100644
index 36e171b..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueArgs.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.internal.VersionedDataSerializable;
-import com.gemstone.gemfire.internal.Version;
-
-/**
- * Defines the arguments to the flush queue request.
- *
- */
-@SuppressWarnings("serial")
-public class HDFSFlushQueueArgs implements VersionedDataSerializable {
-
- private static Version[] serializationVersions = new Version[]{ Version.GFE_81 };
-
- private HashSet<Integer> buckets;
-
- private long maxWaitTimeMillis;
-
- public HDFSFlushQueueArgs() {
- }
-
- public HDFSFlushQueueArgs(Set<Integer> buckets, long maxWaitTime) {
- this.buckets = new HashSet<Integer>(buckets);
- this.maxWaitTimeMillis = maxWaitTime;
- }
-
- @Override
- public void toData(DataOutput out) throws IOException {
- DataSerializer.writeHashSet(buckets, out);
- out.writeLong(maxWaitTimeMillis);
- }
-
- @Override
- public void fromData(DataInput in) throws IOException,
- ClassNotFoundException {
- this.buckets = DataSerializer.readHashSet(in);
- this.maxWaitTimeMillis = in.readLong();
- }
-
- @Override
- public Version[] getSerializationVersions() {
- return serializationVersions;
- }
-
- public Set<Integer> getBuckets() {
- return (Set<Integer>) buckets;
- }
-
- public void setBuckets(Set<Integer> buckets) {
- this.buckets = new HashSet<Integer>(buckets);
- }
-
- public boolean isSynchronous() {
- return maxWaitTimeMillis == 0;
- }
-
- public long getMaxWaitTime() {
- return this.maxWaitTimeMillis;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(getClass().getCanonicalName()).append("@")
- .append(System.identityHashCode(this))
- .append(" buckets:").append(buckets)
- .append(" maxWaitTime:").append(maxWaitTimeMillis);
- return sb.toString();
- }
-}