You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:50:07 UTC
[21/25] incubator-geode git commit: GEODE-10: Reinstating HDFS
persistence code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java
new file mode 100644
index 0000000..cdf7452
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.FunctionService;
+import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
+import com.gemstone.gemfire.cache.hdfs.internal.FlushObserver.AsyncFlushResult;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.internal.InternalEntity;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
+import com.gemstone.gemfire.internal.cache.execute.LocalResultCollector;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+public class HDFSFlushQueueFunction implements Function, InternalEntity{
+ private static final int MAX_RETRIES = Integer.getInteger("gemfireXD.maxFlushQueueRetries", 3);
+ private static final boolean VERBOSE = Boolean.getBoolean("hdfsFlushQueueFunction.VERBOSE");
+ private static final Logger logger = LogService.getLogger();
+ private static final String ID = HDFSFlushQueueFunction.class.getName();
+
+ public static void flushQueue(PartitionedRegion pr, int maxWaitTime) {
+
+ Set<Integer> buckets = new HashSet<Integer>(pr.getRegionAdvisor().getBucketSet());
+
+ maxWaitTime *= 1000;
+ long start = System.currentTimeMillis();
+
+ int retries = 0;
+ long remaining = 0;
+ while (retries++ < MAX_RETRIES && (remaining = waitTime(start, maxWaitTime)) > 0) {
+ if (logger.isDebugEnabled() || VERBOSE) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flushing buckets " + buckets
+ + ", attempt = " + retries
+ + ", remaining = " + remaining));
+ }
+
+ HDFSFlushQueueArgs args = new HDFSFlushQueueArgs(buckets, remaining);
+
+ HDFSFlushQueueResultCollector rc = new HDFSFlushQueueResultCollector(buckets);
+ AbstractExecution exec = (AbstractExecution) FunctionService
+ .onRegion(pr)
+ .withArgs(args)
+ .withCollector(rc);
+ exec.setWaitOnExceptionFlag(true);
+
+ try {
+ exec.execute(ID);
+ if (rc.getResult()) {
+ if (logger.isDebugEnabled() || VERBOSE) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flushed all buckets successfully"));
+ }
+ return;
+ }
+ } catch (FunctionException e) {
+ if (logger.isDebugEnabled() || VERBOSE) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Encountered error flushing queue"), e);
+ }
+ }
+
+ buckets.removeAll(rc.getSuccessfulBuckets());
+ for (int bucketId : buckets) {
+ remaining = waitTime(start, maxWaitTime);
+ if (logger.isDebugEnabled() || VERBOSE) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Waiting for bucket " + bucketId));
+ }
+ pr.getNodeForBucketWrite(bucketId, new PartitionedRegion.RetryTimeKeeper((int) remaining));
+ }
+ }
+
+ pr.checkReadiness();
+ throw new FunctionException("Unable to flush the following buckets: " + buckets);
+ }
+
+ private static long waitTime(long start, long max) {
+ if (max == 0) {
+ return Integer.MAX_VALUE;
+ }
+ return start + max - System.currentTimeMillis();
+ }
+
+ @Override
+ public void execute(FunctionContext context) {
+ RegionFunctionContext rfc = (RegionFunctionContext) context;
+ PartitionedRegion pr = (PartitionedRegion) rfc.getDataSet();
+
+ HDFSFlushQueueArgs args = (HDFSFlushQueueArgs) rfc.getArguments();
+ Set<Integer> buckets = new HashSet<Integer>(args.getBuckets());
+ buckets.retainAll(pr.getDataStore().getAllLocalPrimaryBucketIds());
+
+ Map<Integer, AsyncFlushResult> flushes = new HashMap<Integer, AsyncFlushResult>();
+ for (int bucketId : buckets) {
+ try {
+ HDFSBucketRegionQueue brq = getQueue(pr, bucketId);
+ if (brq != null) {
+ if (logger.isDebugEnabled() || VERBOSE) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flushing bucket " + bucketId));
+ }
+ flushes.put(bucketId, brq.flush());
+ }
+ } catch (ForceReattemptException e) {
+ if (logger.isDebugEnabled() || VERBOSE) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Encountered error flushing bucket " + bucketId), e);
+ }
+ }
+ }
+
+ try {
+ long start = System.currentTimeMillis();
+ for (Map.Entry<Integer, AsyncFlushResult> flush : flushes.entrySet()) {
+ long remaining = waitTime(start, args.getMaxWaitTime());
+ if (logger.isDebugEnabled() || VERBOSE) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Waiting for bucket " + flush.getKey()
+ + " to complete flushing, remaining = " + remaining));
+ }
+
+ if (flush.getValue().waitForFlush(remaining, TimeUnit.MILLISECONDS)) {
+ if (logger.isDebugEnabled() || VERBOSE) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Bucket " + flush.getKey() + " flushed successfully"));
+ }
+ rfc.getResultSender().sendResult(new FlushStatus(flush.getKey()));
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ if (logger.isDebugEnabled() || VERBOSE) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Sending final flush result"));
+ }
+ rfc.getResultSender().lastResult(FlushStatus.last());
+ }
+
+ private HDFSBucketRegionQueue getQueue(PartitionedRegion pr, int bucketId)
+ throws ForceReattemptException {
+ AsyncEventQueueImpl aeq = pr.getHDFSEventQueue();
+ AbstractGatewaySender gw = (AbstractGatewaySender) aeq.getSender();
+ AbstractGatewaySenderEventProcessor ep = gw.getEventProcessor();
+ if (ep == null) {
+ return null;
+ }
+
+ ConcurrentParallelGatewaySenderQueue queue = (ConcurrentParallelGatewaySenderQueue) ep.getQueue();
+ return queue.getBucketRegionQueue(pr, bucketId);
+ }
+
+ @Override
+ public String getId() {
+ return ID;
+ }
+
+ @Override
+ public boolean hasResult() {
+ return true;
+ }
+
+ @Override
+ public boolean optimizeForWrite() {
+ return true;
+ }
+
+ @Override
+ public boolean isHA() {
+ return false;
+ }
+
+ public static class HDFSFlushQueueResultCollector implements LocalResultCollector<Object, Boolean> {
+ private final CountDownLatch complete;
+ private final Set<Integer> expectedBuckets;
+ private final Set<Integer> successfulBuckets;
+
+ private volatile ReplyProcessor21 processor;
+
+ public HDFSFlushQueueResultCollector(Set<Integer> expectedBuckets) {
+ this.expectedBuckets = expectedBuckets;
+
+ complete = new CountDownLatch(1);
+ successfulBuckets = new HashSet<Integer>();
+ }
+
+ public Set<Integer> getSuccessfulBuckets() {
+ synchronized (successfulBuckets) {
+ return new HashSet<Integer>(successfulBuckets);
+ }
+ }
+
+ @Override
+ public Boolean getResult() throws FunctionException {
+ try {
+ complete.await();
+ synchronized (successfulBuckets) {
+ LogWriterI18n logger = InternalDistributedSystem.getLoggerI18n();
+ if (logger.fineEnabled() || VERBOSE) {
+ logger.info(LocalizedStrings.DEBUG, "Expected buckets: " + expectedBuckets);
+ logger.info(LocalizedStrings.DEBUG, "Successful buckets: " + successfulBuckets);
+ }
+ return expectedBuckets.equals(successfulBuckets);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ GemFireCacheImpl.getExisting().getCancelCriterion().checkCancelInProgress(e);
+ throw new FunctionException(e);
+ }
+ }
+
+ @Override
+ public Boolean getResult(long timeout, TimeUnit unit)
+ throws FunctionException, InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public synchronized void addResult(DistributedMember memberID, Object result) {
+ if (result instanceof FlushStatus) {
+ FlushStatus status = (FlushStatus) result;
+ if (!status.isLast()) {
+ synchronized (successfulBuckets) {
+ successfulBuckets.add(status.getBucketId());
+ }
+ }
+ }
+ }
+
+ @Override
+ public void endResults() {
+ complete.countDown();
+ }
+
+ @Override
+ public void clearResults() {
+ }
+
+ @Override
+ public void setProcessor(ReplyProcessor21 processor) {
+ this.processor = processor;
+ }
+
+ @Override
+ public ReplyProcessor21 getProcessor() {
+ return processor;
+ }
+
+ @Override
+ public void setException(Throwable exception) {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java
new file mode 100644
index 0000000..ec0f9ff
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.internal.VersionedDataSerializable;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * Arguments passed to the HDFSForceCompactionFunction
+ *
+ */
+@SuppressWarnings("serial")
+public class HDFSForceCompactionArgs implements VersionedDataSerializable {
+
+ private static Version[] serializationVersions = new Version[]{ Version.GFE_81 };
+
+ private HashSet<Integer> buckets;
+
+ private boolean isMajor;
+
+ private int maxWaitTime;
+
+ public HDFSForceCompactionArgs() {
+ }
+
+ public HDFSForceCompactionArgs(Set<Integer> buckets, boolean isMajor, Integer maxWaitTime) {
+ this.buckets = new HashSet<Integer>(buckets);
+ this.isMajor = isMajor;
+ this.maxWaitTime = maxWaitTime;
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ DataSerializer.writeHashSet(buckets, out);
+ out.writeBoolean(isMajor);
+ out.writeInt(maxWaitTime);
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException,
+ ClassNotFoundException {
+ this.buckets = DataSerializer.readHashSet(in);
+ this.isMajor = in.readBoolean();
+ this.maxWaitTime = in.readInt();
+ }
+
+ @Override
+ public Version[] getSerializationVersions() {
+ return serializationVersions;
+ }
+
+ public Set<Integer> getBuckets() {
+ return (Set<Integer>) buckets;
+ }
+
+ public void setBuckets(Set<Integer> buckets) {
+ this.buckets = new HashSet<Integer>(buckets);
+ }
+
+ public boolean isMajor() {
+ return isMajor;
+ }
+
+ public void setMajor(boolean isMajor) {
+ this.isMajor = isMajor;
+ }
+
+ public boolean isSynchronous() {
+ return maxWaitTime == 0;
+ }
+
+ public int getMaxWaitTime() {
+ return this.maxWaitTime;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(getClass().getCanonicalName()).append("@")
+ .append(System.identityHashCode(this))
+ .append(" buckets:").append(buckets)
+ .append(" isMajor:").append(isMajor)
+ .append(" maxWaitTime:").append(maxWaitTime);
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java
new file mode 100644
index 0000000..d26ac1b
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
+import com.gemstone.gemfire.internal.InternalEntity;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Function responsible for forcing a compaction on all members
+ * of the system
+ *
+ */
+@SuppressWarnings("serial")
+public class HDFSForceCompactionFunction implements Function, InternalEntity {
+
+ public static final int FORCE_COMPACTION_MAX_RETRIES = Integer.getInteger("gemfireXD.maxCompactionRetries", 3);
+
+ public static final int BUCKET_ID_FOR_LAST_RESULT = -1;
+
+ public static final String ID = "HDFSForceCompactionFunction";
+
+ private static final Logger logger = LogService.getLogger();
+
+ @Override
+ public void execute(FunctionContext context) {
+ if (context.isPossibleDuplicate()) {
+ // do not re-execute the function, another function
+ // targeting the failed buckets will be invoked
+ context.getResultSender().lastResult(new CompactionStatus(BUCKET_ID_FOR_LAST_RESULT, false));
+ return;
+ }
+ RegionFunctionContext rfc = (RegionFunctionContext) context;
+ PartitionedRegion pr = (PartitionedRegion) rfc.getDataSet();
+ HDFSForceCompactionArgs args = (HDFSForceCompactionArgs) rfc.getArguments();
+ Set<Integer> buckets = new HashSet<Integer>(args.getBuckets()); // copying avoids race when the function coordinator
+ // also runs the function locally
+ buckets.retainAll(pr.getDataStore().getAllLocalPrimaryBucketIds());
+
+ List<Future<CompactionStatus>> futures = pr.forceLocalHDFSCompaction(buckets, args.isMajor(), 0);
+ int waitFor = args.getMaxWaitTime();
+ for (Future<CompactionStatus> future : futures) {
+ long start = System.currentTimeMillis();
+ CompactionStatus status = null;
+ try {
+ // TODO use a CompletionService instead
+ if (!args.isSynchronous() && waitFor <= 0) {
+ break;
+ }
+ status = args.isSynchronous() ? future.get() : future.get(waitFor, TimeUnit.MILLISECONDS);
+ buckets.remove(status.getBucketId());
+ if (logger.isDebugEnabled()) {
+ logger.debug("HDFS: ForceCompaction sending result:"+status);
+ }
+ context.getResultSender().sendResult(status);
+ long elapsedTime = System.currentTimeMillis() - start;
+ waitFor -= elapsedTime;
+ } catch (InterruptedException e) {
+ // send a list of failed buckets after waiting for all buckets
+ } catch (ExecutionException e) {
+ // send a list of failed buckets after waiting for all buckets
+ } catch (TimeoutException e) {
+ // do not wait for other buckets to complete
+ break;
+ }
+ }
+ // for asynchronous invocation, the status is true for buckets that we did not wait for
+ boolean status = args.isSynchronous() ? false : true;
+ for (Integer bucketId : buckets) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("HDFS: ForceCompaction sending result for bucket:"+bucketId);
+ }
+ context.getResultSender().sendResult(new CompactionStatus(bucketId, status));
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("HDFS: ForceCompaction sending last result");
+ }
+ context.getResultSender().lastResult(new CompactionStatus(BUCKET_ID_FOR_LAST_RESULT, true));
+ }
+
+ @Override
+ public String getId() {
+ return ID;
+ }
+
+ @Override
+ public boolean hasResult() {
+ return true;
+ }
+
+ @Override
+ public boolean optimizeForWrite() {
+ // run compaction on primary members
+ return true;
+ }
+
+ @Override
+ public boolean isHA() {
+ // so that we can target re-execution on failed buckets
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java
new file mode 100644
index 0000000..ee5e4aa
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.execute.LocalResultCollector;
+
+/**
+ *
+ */
+public class HDFSForceCompactionResultCollector implements LocalResultCollector<Object, List<CompactionStatus>> {
+
+ /** list of received replies*/
+ private List<CompactionStatus> reply = new ArrayList<CompactionStatus>();
+
+ /** semaphore to block the caller of getResult()*/
+ private CountDownLatch waitForResults = new CountDownLatch(1);
+
+ /** boolean to indicate if clearResults() was called to indicate a failure*/
+ private volatile boolean shouldRetry;
+
+ private ReplyProcessor21 processor;
+
+ @Override
+ public List<CompactionStatus> getResult() throws FunctionException {
+ try {
+ waitForResults.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ GemFireCacheImpl.getExisting().getCancelCriterion().checkCancelInProgress(e);
+ throw new FunctionException(e);
+ }
+ return reply;
+ }
+
+ @Override
+ public List<CompactionStatus> getResult(long timeout, TimeUnit unit)
+ throws FunctionException, InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void addResult(DistributedMember memberID,
+ Object resultOfSingleExecution) {
+ if (resultOfSingleExecution instanceof CompactionStatus) {
+ CompactionStatus status = (CompactionStatus) resultOfSingleExecution;
+ if (status.getBucketId() != HDFSForceCompactionFunction.BUCKET_ID_FOR_LAST_RESULT) {
+ reply.add(status);
+ }
+ }
+ }
+
+ @Override
+ public void endResults() {
+ waitForResults.countDown();
+ }
+
+ @Override
+ public void clearResults() {
+ this.shouldRetry = true;
+ waitForResults.countDown();
+ }
+
+ /**
+ * @return true if retry should be attempted
+ */
+ public boolean shouldRetry() {
+ return this.shouldRetry || !getFailedBucketIds().isEmpty();
+ }
+
+ private Set<Integer> getFailedBucketIds() {
+ Set<Integer> result = new HashSet<Integer>();
+ for (CompactionStatus status : reply) {
+ if (!status.isStatus()) {
+ result.add(status.getBucketId());
+ }
+ }
+ return result;
+ }
+
+ public Set<Integer> getSuccessfulBucketIds() {
+ Set<Integer> result = new HashSet<Integer>();
+ for (CompactionStatus status : reply) {
+ if (status.isStatus()) {
+ result.add(status.getBucketId());
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void setProcessor(ReplyProcessor21 processor) {
+ this.processor = processor;
+ }
+
+ @Override
+ public ReplyProcessor21 getProcessor() {
+ return this.processor;
+ }
+
+@Override
+public void setException(Throwable exception) {
+ // TODO Auto-generated method stub
+
+}
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java
new file mode 100644
index 0000000..789fe4d
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import com.gemstone.gemfire.cache.execute.FunctionAdapter;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
+import com.gemstone.gemfire.internal.InternalEntity;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+
+/**
+ * Function that returns the oldest timestamp among all the major
+ * compacted buckets on the members
+ *
+ */
+@SuppressWarnings("serial")
+public class HDFSLastCompactionTimeFunction extends FunctionAdapter implements InternalEntity{
+
+ public static final String ID = "HDFSLastCompactionTimeFunction";
+
+ @Override
+ public void execute(FunctionContext context) {
+ RegionFunctionContext rfc = (RegionFunctionContext) context;
+ PartitionedRegion pr = (PartitionedRegion) rfc.getDataSet();
+ rfc.getResultSender().lastResult(pr.lastLocalMajorHDFSCompaction());
+ }
+
+ @Override
+ public String getId() {
+ return ID;
+ }
+
+ @Override
+ public boolean isHA() {
+ return true;
+ }
+
+ @Override
+ public boolean optimizeForWrite() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java
new file mode 100644
index 0000000..6d70dce
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.gemstone.gemfire.StatisticsFactory;
+import com.gemstone.gemfire.cache.GemFireCache;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.internal.SystemTimer;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Cache for hoplog organizers associated with buckets of a region. The director creates an
+ * instance of organizer on first get request. It does not read HDFS in advance. Creation of
+ * organizer depends on File system initialization that takes outside this class. This class also
+ * provides utility methods to monitor usage and manage bucket sets.
+ *
+ */
+public class HDFSRegionDirector {
+ /*
+ * Maps each region name to its listener and store objects. This map must be populated before file
+ * organizers of a bucket can be created
+ */
+ private final ConcurrentHashMap<String, HdfsRegionManager> regionManagerMap;
+
+ /**
+ * regions of this Gemfire cache are managed by this director. TODO this
+ * should be final and be provided at the time of creation of this instance or
+ * through a cache directory
+ */
+ private GemFireCache cache;
+
+ // singleton instance
+ private static HDFSRegionDirector instance;
+
+ final ScheduledExecutorService janitor;
+ private JanitorTask janitorTask;
+
+ private static final Logger logger = LogService.getLogger();
+ protected final static String logPrefix = "<" + "RegionDirector" + "> ";
+
+
+ private HDFSRegionDirector() {
+ regionManagerMap = new ConcurrentHashMap<String, HDFSRegionDirector.HdfsRegionManager>();
+ janitor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r, "HDFSRegionJanitor");
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+
+ long interval = Long.getLong(HoplogConfig.JANITOR_INTERVAL_SECS,
+ HoplogConfig.JANITOR_INTERVAL_SECS_DEFAULT);
+
+ janitorTask = new JanitorTask();
+ janitor.scheduleWithFixedDelay(janitorTask, interval, interval,
+ TimeUnit.SECONDS);
+ }
+
+ public synchronized static HDFSRegionDirector getInstance() {
+ if (instance == null) {
+ instance = new HDFSRegionDirector();
+ }
+ return instance;
+ }
+
+ public HDFSRegionDirector setCache(GemFireCache cache) {
+ this.cache = cache;
+ return this;
+ }
+
+ public GemFireCache getCache() {
+ return this.cache;
+ }
+ /**
+ * Caches listener, store object and list of organizers associated with the region associated with
+ * a region. Subsequently, these objects will be used each time an organizer is created
+ */
+ public synchronized HdfsRegionManager manageRegion(LocalRegion region, String storeName,
+ HoplogListener listener) {
+
+ HdfsRegionManager manager = regionManagerMap.get(region.getFullPath());
+ if (manager != null) {
+ // this is an attempt to re-register a region. Assuming this was required
+ // to modify listener or hdfs store impl associated with the region. Hence
+ // will clear the region first.
+
+ clear(region.getFullPath());
+ }
+
+ HDFSStoreImpl store = HDFSStoreDirector.getInstance().getHDFSStore(storeName);
+ manager = new HdfsRegionManager(region, store, listener, getStatsFactory(), this);
+ regionManagerMap.put(region.getFullPath(), manager);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Now managing region " + region.getFullPath(), logPrefix);
+ }
+
+ return manager;
+ }
+
+ /**
+ * Find the regions that are part of a particular HDFS store.
+ */
+ public Collection<String> getRegionsInStore(HDFSStore store) {
+ TreeSet<String> regions = new TreeSet<String>();
+ for(Map.Entry<String, HdfsRegionManager> entry : regionManagerMap.entrySet()) {
+ if(entry.getValue().getStore().equals(store)) {
+ regions.add(entry.getKey());
+ }
+ }
+ return regions;
+ }
+
+ public int getBucketCount(String regionPath) {
+ HdfsRegionManager manager = regionManagerMap.get(regionPath);
+ if (manager == null) {
+ throw new IllegalStateException("Region not initialized");
+ }
+
+ return manager.bucketOrganizerMap.size();
+ }
+
+ public void closeWritersForRegion(String regionPath, int minSizeForFileRollover) throws IOException {
+ regionManagerMap.get(regionPath).closeWriters(minSizeForFileRollover);
+ }
+ /**
+ * removes and closes all {@link HoplogOrganizer} of this region. This call is expected with
+ * a PR disowns a region.
+ */
+ public synchronized void clear(String regionPath) {
+ HdfsRegionManager manager = regionManagerMap.remove(regionPath);
+ if (manager != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Closing hoplog region manager for " + regionPath, logPrefix);
+ }
+ manager.close();
+ }
+ }
+
+ /**
+ * Closes all region managers, organizers and hoplogs. This method should be
+ * called before closing the cache to gracefully release all resources
+ */
+ public static synchronized void reset() {
+ if (instance == null) {
+ // nothing to reset
+ return;
+ }
+
+ instance.janitor.shutdownNow();
+
+ for (String region : instance.regionManagerMap.keySet()) {
+ instance.clear(region);
+ }
+ instance.cache = null;
+ instance = null;
+ }
+
+ /**
+ * Terminates current janitor task and schedules a new. The rate of the new
+ * task is based on the value of system property at that time
+ */
+ public static synchronized void resetJanitor() {
+ instance.janitorTask.terminate();
+ instance.janitorTask = instance.new JanitorTask();
+ long interval = Long.getLong(HoplogConfig.JANITOR_INTERVAL_SECS,
+ HoplogConfig.JANITOR_INTERVAL_SECS_DEFAULT);
+ instance.janitor.scheduleWithFixedDelay(instance.janitorTask, 0, interval,
+ TimeUnit.SECONDS);
+ }
+
+ /**
+ * @param regionPath name of region for which stats object is desired
+ * @return {@link SortedOplogStatistics} instance associated with hdfs region
+ * name. Null if region is not managed by director
+ */
+ public synchronized SortedOplogStatistics getHdfsRegionStats(String regionPath) {
+ HdfsRegionManager manager = regionManagerMap.get(regionPath);
+ return manager == null ? null : manager.getHdfsStats();
+ }
+
+ private StatisticsFactory getStatsFactory() {
+ return cache.getDistributedSystem();
+ }
+
+ /**
+ * A helper class to manage region and its organizers
+ */
+ public static class HdfsRegionManager {
+ // name and store configuration of the region whose buckets are managed by this director.
+ private LocalRegion region;
+ private HDFSStoreImpl store;
+ private HoplogListener listener;
+ private volatile boolean closed = false;
+ private final int FILE_ROLLOVER_TASK_INTERVAL = Integer.parseInt
+ (System.getProperty("gemfire.HDFSRegionDirector.FILE_ROLLOVER_TASK_INTERVAL_SECONDS", "60"));
+
+ private SystemTimer hoplogCloseTimer = null;
+
+ // instance of hdfs statistics object for this hdfs based region. This
+ // object will collect usage and performance related statistics.
+ private final SortedOplogStatistics hdfsStats;
+
+ /*
+ * An instance of organizer is created for each bucket of regionName region residing on this
+ * node. This member maps bucket id with its corresponding organizer instance. A lock is used to
+ * manage concurrent writes to the map.
+ */
+ private ConcurrentMap<Integer, HoplogOrganizer> bucketOrganizerMap;
+
+ private HDFSRegionDirector hdfsRegionDirector;
+
+ /**
+ * @param listener
+ * listener of change events like file creation and deletion
+ * @param hdfsRegionDirector
+ */
+ HdfsRegionManager(LocalRegion region, HDFSStoreImpl store,
+ HoplogListener listener, StatisticsFactory statsFactory, HDFSRegionDirector hdfsRegionDirector) {
+ bucketOrganizerMap = new ConcurrentHashMap<Integer, HoplogOrganizer>();
+ this.region = region;
+ this.listener = listener;
+ this.store = store;
+ this.hdfsStats = new SortedOplogStatistics(statsFactory, "HDFSRegionStatistics", region.getFullPath());
+ this.hdfsRegionDirector = hdfsRegionDirector;
+ }
+
+ public void closeWriters(int minSizeForFileRollover) throws IOException {
+ final long startTime = System.currentTimeMillis();
+ long elapsedTime = 0;
+
+ Collection<HoplogOrganizer> organizers = bucketOrganizerMap.values();
+
+ for (HoplogOrganizer organizer : organizers) {
+
+ try {
+ this.getRegion().checkReadiness();
+ } catch (Exception e) {
+ break;
+ }
+
+ ((HDFSUnsortedHoplogOrganizer)organizer).synchronizedCloseWriter(true, 0,
+ minSizeForFileRollover);
+ }
+
+ }
+
+ public synchronized <T extends PersistedEventImpl> HoplogOrganizer<T> create(int bucketId) throws IOException {
+ assert !bucketOrganizerMap.containsKey(bucketId);
+
+ HoplogOrganizer<?> organizer = region.getHDFSWriteOnly()
+ ? new HDFSUnsortedHoplogOrganizer(this, bucketId)
+ : new HdfsSortedOplogOrganizer(this, bucketId);
+
+ bucketOrganizerMap.put(bucketId, organizer);
+ // initialize a timer that periodically closes the hoplog writer if the
+ // time for rollover has passed. It also has the responsibility to fix the files.
+ if (this.region.getHDFSWriteOnly() &&
+ hoplogCloseTimer == null) {
+ hoplogCloseTimer = new SystemTimer(hdfsRegionDirector.
+ getCache().getDistributedSystem(), true);
+
+ // schedule the task to fix the files that were not closed properly
+ // last time.
+ hoplogCloseTimer.scheduleAtFixedRate(new CloseTmpHoplogsTimerTask(this),
+ 1000, FILE_ROLLOVER_TASK_INTERVAL * 1000);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Schedulng hoplog rollover timer with interval "+ FILE_ROLLOVER_TASK_INTERVAL +
+ " for hoplog organizer for " + region.getFullPath()
+ + ":" + bucketId + " " + organizer, logPrefix);
+ }
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Constructed hoplog organizer for " + region.getFullPath()
+ + ":" + bucketId + " " + organizer, logPrefix);
+ }
+ return (HoplogOrganizer<T>) organizer;
+ }
+
+ public synchronized <T extends PersistedEventImpl> void addOrganizer(
+ int bucketId, HoplogOrganizer<T> organizer) {
+ if (bucketOrganizerMap.containsKey(bucketId)) {
+ throw new IllegalArgumentException();
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}added pre constructed organizer " + region.getFullPath()
+ + ":" + bucketId + " " + organizer, logPrefix);
+ }
+ bucketOrganizerMap.put(bucketId, organizer);
+ }
+
+ public void close() {
+ closed = true;
+
+ if (this.region.getHDFSWriteOnly() &&
+ hoplogCloseTimer != null) {
+ hoplogCloseTimer.cancel();
+ hoplogCloseTimer = null;
+ }
+ for (int bucket : bucketOrganizerMap.keySet()) {
+ close(bucket);
+ }
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public synchronized void close(int bucketId) {
+ try {
+ HoplogOrganizer organizer = bucketOrganizerMap.remove(bucketId);
+ if (organizer != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Closing hoplog organizer for " + region.getFullPath() + ":" +
+ bucketId + " " + organizer, logPrefix);
+ }
+ organizer.close();
+ }
+ } catch (IOException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(logPrefix + "Error closing hoplog organizer for " + region.getFullPath() + ":" + bucketId, e);
+ }
+ }
+ //TODO abort compaction and flush requests for this region
+ }
+
+ public static String getRegionFolder(String regionPath) {
+ String folder = regionPath;
+ //Change any underscore into a double underscore
+ folder = folder.replace("_", "__");
+ //get rid of the leading slash
+ folder = folder.replaceFirst("^/", "");
+ //replace slashes with underscores
+ folder = folder.replace('/', '_');
+ return folder;
+ }
+
+ public String getRegionFolder() {
+ return getRegionFolder(region.getFullPath());
+ }
+
+ public HoplogListener getListener() {
+ return listener;
+ }
+
+ public HDFSStoreImpl getStore() {
+ return store;
+ }
+
+ public LocalRegion getRegion() {
+ return region;
+ }
+
+ public SortedOplogStatistics getHdfsStats() {
+ return hdfsStats;
+ }
+
+ public Collection<HoplogOrganizer> getBucketOrganizers(){
+ return this.bucketOrganizerMap.values();
+ }
+
+ /**
+ * get the HoplogOrganizers only for the given set of buckets
+ */
+ public Collection<HoplogOrganizer> getBucketOrganizers(Set<Integer> buckets){
+ Set<HoplogOrganizer> result = new HashSet<HoplogOrganizer>();
+ for (Integer bucketId : buckets) {
+ result.add(this.bucketOrganizerMap.get(bucketId));
+ }
+ return result;
+ }
+
+ /**
+ * Delete all files from HDFS for this region. This method
+ * should be called after all members have destroyed their
+ * region in gemfire, so there should be no threads accessing
+ * these files.
+ * @throws IOException
+ */
+ public void destroyData() throws IOException {
+ //Make sure everything is shut down and closed.
+ close();
+ if (store == null) {
+ return;
+ }
+ Path regionPath = new Path(store.getHomeDir(), getRegionFolder());
+
+ //Delete all files in HDFS.
+ FileSystem fs = getStore().getFileSystem();
+ if(!fs.delete(regionPath, true)) {
+ if(fs.exists(regionPath)) {
+ throw new IOException("Unable to delete " + regionPath);
+ }
+ }
+ }
+
+ public void performMaintenance() throws IOException {
+ Collection<HoplogOrganizer> buckets = getBucketOrganizers();
+ for (HoplogOrganizer bucket : buckets) {
+ bucket.performMaintenance();
+ }
+ }
+ }
+
+ private class JanitorTask implements Runnable {
+ boolean terminated = false;
+ @Override
+ public void run() {
+ if (terminated) {
+ return;
+ }
+ fineLog("Executing HDFS Region janitor task", null);
+
+ Collection<HdfsRegionManager> regions = regionManagerMap.values();
+ for (HdfsRegionManager region : regions) {
+ fineLog("Maintaining region:" + region.getRegionFolder(), null);
+ try {
+ region.performMaintenance();
+ } catch (Throwable e) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.HOPLOG_IO_ERROR , region.getRegionFolder()));
+ logger.info(LocalizedMessage.create(LocalizedStrings.ONE_ARG, e.getMessage()));
+ fineLog(null, e);
+ }
+ }
+ }
+
+ public void terminate() {
+ terminated = true;
+ }
+ }
+
+ protected static void fineLog(String message, Throwable e) {
+ if(logger.isDebugEnabled()) {
+ logger.debug(message, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java
new file mode 100644
index 0000000..880ef3e
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+
+/**
+ * HDFSStoreDirector is created for managing all instances of HDFSStoreImpl.
+ *
+ */
+public final class HDFSStoreDirector {
+ private final ConcurrentHashMap<String, HDFSStoreImpl> storeMap = new ConcurrentHashMap<String, HDFSStoreImpl>();
+
+ // singleton instance
+ private static volatile HDFSStoreDirector instance;
+
+ private HDFSStoreDirector() {
+
+ }
+
+ public static final HDFSStoreDirector getInstance() {
+ if (instance == null) {
+ synchronized (HDFSStoreDirector.class) {
+ if (instance == null)
+ instance = new HDFSStoreDirector();
+ }
+ }
+ return instance;
+ }
+
+ // Called when the region is created.
+ public final void addHDFSStore(HDFSStoreImpl hdfsStore){
+ this.storeMap.put(hdfsStore.getName(), hdfsStore);
+ }
+
+ public final HDFSStoreImpl getHDFSStore(String hdfsStoreName) {
+ return this.storeMap.get(hdfsStoreName);
+ }
+
+ public final void removeHDFSStore(String hdfsStoreName) {
+ this.storeMap.remove(hdfsStoreName);
+ }
+
+ public void closeHDFSStores() {
+ Iterator<HDFSStoreImpl> it = this.storeMap.values().iterator();
+ while (it.hasNext()) {
+ HDFSStoreImpl hsi = it.next();
+ hsi.close();
+ }
+ this.storeMap.clear();
+ }
+
+ public ArrayList<HDFSStoreImpl> getAllHDFSStores() {
+ ArrayList<HDFSStoreImpl> hdfsStores = new ArrayList<HDFSStoreImpl>();
+ hdfsStores.addAll(this.storeMap.values());
+ return hdfsStores;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java
new file mode 100644
index 0000000..cbb35cb
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.UnsortedHoplogPersistedEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter;
+import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/**
+ * Manages unsorted Hoplog files for a bucket (Streaming Ingest option). An instance per bucket
+ * will exist in each PR
+ *
+ *
+ */
+public class HDFSUnsortedHoplogOrganizer extends AbstractHoplogOrganizer<UnsortedHoplogPersistedEvent> {
+ public static final String HOPLOG_REGEX = HOPLOG_NAME_REGEX + "("
+ + SEQ_HOPLOG_EXTENSION + "|" + TEMP_HOPLOG_EXTENSION + ")";
+ public static final Pattern HOPLOG_PATTERN = Pattern.compile(HOPLOG_REGEX);
+ protected static String TMP_FILE_NAME_REGEX = HOPLOG_NAME_REGEX + SEQ_HOPLOG_EXTENSION + TEMP_HOPLOG_EXTENSION + "$";
+ protected static final Pattern patternForTmpHoplog = Pattern.compile(TMP_FILE_NAME_REGEX);
+
+ volatile private HoplogWriter writer;
+ volatile private Hoplog currentHoplog;
+
+ volatile private long lastFlushTime = System.currentTimeMillis();
+
+ volatile private boolean abortFlush = false;
+ private FileSystem fileSystem;
+
+ public HDFSUnsortedHoplogOrganizer(HdfsRegionManager region, int bucketId) throws IOException{
+ super(region, bucketId);
+ writer = null;
+ sequence = new AtomicInteger(0);
+
+ fileSystem = store.getFileSystem();
+ if (! fileSystem.exists(bucketPath)) {
+ return;
+ }
+
+ FileStatus validHoplogs[] = FSUtils.listStatus(fileSystem, bucketPath, new PathFilter() {
+ @Override
+ public boolean accept(Path file) {
+ // All valid hoplog files must match the regex
+ Matcher matcher = HOPLOG_PATTERN.matcher(file.getName());
+ return matcher.matches();
+ }
+ });
+
+ if (validHoplogs != null && validHoplogs.length > 0) {
+ for (FileStatus file : validHoplogs) {
+ // account for the disk used by this file
+ incrementDiskUsage(file.getLen());
+ }
+ }
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ if (logger.isDebugEnabled())
+ logger.debug("{}Closing the hoplog organizer and the open files", logPrefix);
+ // abort the flush so that we can immediately call the close current writer.
+ abortFlush = true;
+ synchronizedCloseWriter(true, 0, 0);
+ }
+
+
+ /**
+ * Flushes the data to HDFS.
+ * Synchronization ensures that the writer is not closed when flush is happening.
+ * To abort the flush, abortFlush needs to be set.
+ * @throws ForceReattemptException
+ */
+ @Override
+ public synchronized void flush(Iterator<? extends QueuedPersistentEvent> bufferIter, final int count)
+ throws IOException, ForceReattemptException {
+ assert bufferIter != null;
+
+ if (abortFlush)
+ throw new CacheClosedException("Either the region has been cleared " +
+ "or closed. Aborting the ongoing flush operation.");
+ if (logger.isDebugEnabled())
+ logger.debug("{}Initializing flush operation", logPrefix);
+
+ // variables for updating stats
+ long start = stats.getFlush().begin();
+ int byteCount = 0;
+ if (writer == null) {
+ // Hoplogs of sequence files are always created with a 0 sequence number
+ currentHoplog = getTmpSortedOplog(0, SEQ_HOPLOG_EXTENSION);
+ try {
+ writer = this.store.getSingletonWriter().runSerially(new Callable<Hoplog.HoplogWriter>() {
+ @Override
+ public HoplogWriter call() throws Exception {
+ return currentHoplog.createWriter(count);
+ }
+ });
+ } catch (Exception e) {
+ if (e instanceof IOException) {
+ throw (IOException)e;
+ }
+ throw new IOException(e);
+ }
+ }
+ long timeSinceLastFlush = (System.currentTimeMillis() - lastFlushTime)/1000 ;
+
+ try {
+ /**MergeGemXDHDFSToGFE changed the following statement as the code of HeapDataOutputStream is not merged */
+ //HeapDataOutputStream out = new HeapDataOutputStream();
+ while (bufferIter.hasNext()) {
+ HeapDataOutputStream out = new HeapDataOutputStream(1024, null);
+ if (abortFlush) {
+ stats.getFlush().end(byteCount, start);
+ throw new CacheClosedException("Either the region has been cleared " +
+ "or closed. Aborting the ongoing flush operation.");
+ }
+ QueuedPersistentEvent item = bufferIter.next();
+ item.toHoplogEventBytes(out);
+ byte[] valueBytes = out.toByteArray();
+ writer.append(item.getRawKey(), valueBytes);
+ // add key length and value length to stats byte counter
+ byteCount += (item.getRawKey().length + valueBytes.length);
+ /**MergeGemXDHDFSToGFE how to clear for reuse. Leaving it for Darrel to merge this change*/
+ //out.clearForReuse();
+ }
+ // ping secondaries before making the file a legitimate file to ensure
+ // that in case of split brain, no other vm has taken up as primary. #50110.
+ if (!abortFlush)
+ pingSecondaries();
+ // append completed. If the file is to be rolled over,
+ // close writer and rename the file to a legitimate name.
+ // Else, sync the already written data with HDFS nodes.
+ int maxFileSize = this.store.getWriteOnlyFileRolloverSize() * 1024 * 1024;
+ int fileRolloverInterval = this.store.getWriteOnlyFileRolloverInterval();
+ if (writer.getCurrentSize() >= maxFileSize ||
+ timeSinceLastFlush >= fileRolloverInterval) {
+ closeCurrentWriter();
+ }
+ else {
+ // if flush is not aborted, hsync the batch. It ensures that
+ // the batch has reached HDFS and we can discard it.
+ if (!abortFlush)
+ writer.hsync();
+ }
+ } catch (IOException e) {
+ stats.getFlush().error(start);
+ // as there is an exception, it can be probably be a file specific problem.
+ // close the current file to avoid any file specific issues next time
+ closeCurrentWriter();
+ // throw the exception so that async queue will dispatch the same batch again
+ throw e;
+ }
+
+ stats.getFlush().end(byteCount, start);
+ }
+
+ /**
+ * Synchronization ensures that the writer is not closed when flush is happening.
+ */
+ synchronized void synchronizedCloseWriter(boolean forceClose,
+ long timeSinceLastFlush, int minsizeforrollover) throws IOException {
+ long writerSize = 0;
+ if (writer != null){
+ writerSize = writer.getCurrentSize();
+ }
+
+ if (writerSize < (minsizeforrollover * 1024L))
+ return;
+
+ int maxFileSize = this.store.getWriteOnlyFileRolloverSize() * 1024 * 1024;
+ int fileRolloverInterval = this.store.getWriteOnlyFileRolloverInterval();
+ if (writerSize >= maxFileSize ||
+ timeSinceLastFlush >= fileRolloverInterval || forceClose) {
+ closeCurrentWriter();
+ }
+ }
+
+
+ /**
+ * Closes the current writer so that next time a new hoplog can
+ * be created. Also, fixes any tmp hoplogs.
+ *
+ * @throws IOException
+ */
+ void closeCurrentWriter() throws IOException {
+
+ if (writer != null) {
+ // If this organizer is closing, it is ok to ignore exceptions here
+ // because CloseTmpHoplogsTimerTask
+ // on another member may have already renamed the hoplog
+ // fixes bug 49141
+ boolean isClosing = abortFlush;
+ try {
+ incrementDiskUsage(writer.getCurrentSize());
+ } catch (IOException e) {
+ if (!isClosing) {
+ throw e;
+ }
+ }
+ if (logger.isDebugEnabled())
+ logger.debug("{}Closing hoplog " + currentHoplog.getFileName(), logPrefix);
+ try{
+ writer.close();
+ makeLegitimate(currentHoplog);
+ } catch (IOException e) {
+ if (!isClosing) {
+ logger.warn(LocalizedStrings.HOPLOG_FLUSH_OPERATION_FAILED, e);
+ throw e;
+ }
+ } finally {
+ writer = null;
+ lastFlushTime = System.currentTimeMillis();
+ }
+ }
+ else
+ lastFlushTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public void clear() throws IOException {
+ boolean prevAbortFlushFlag = abortFlush;
+ // abort the flush so that we can immediately call the close current writer.
+ abortFlush = true;
+
+ // Close if there is any existing writer.
+ try {
+ synchronizedCloseWriter(true, 0, 0);
+ } catch (IOException e) {
+ logger.warn(LocalizedStrings.HOPLOG_CLOSE_FAILED, e);
+ }
+
+ // reenable the aborted flush
+ abortFlush = prevAbortFlushFlag;
+
+ // Mark the hoplogs for deletion
+ markHoplogsForDeletion();
+
+ }
+
+ @Override
+ public void performMaintenance() {
+ // TODO remove the timer for tmp file conversion. Use this instead
+ }
+
+ @Override
+ public Future<CompactionStatus> forceCompaction(boolean isMajor) {
+ return null;
+ }
+
+ @Override
+ protected Hoplog getHoplog(Path hoplogPath) throws IOException {
+ Hoplog so = new SequenceFileHoplog(fileSystem, hoplogPath, stats);
+ return so;
+ }
+
+ /**
+ * Fixes the size of hoplogs that were not closed properly last time.
+ * Such hoplogs are *.tmphop files. Identify them and open them and close
+ * them, this fixes the size. After doing this rename them to *.hop.
+ *
+ * @throws IOException
+ * @throws ForceReattemptException
+ */
+ void identifyAndFixTmpHoplogs(FileSystem fs) throws IOException, ForceReattemptException {
+ if (logger.isDebugEnabled())
+ logger.debug("{}Fixing temporary hoplogs", logPrefix);
+
+ // A different filesystem is passed to this function for the following reason:
+ // For HDFS, if a file wasn't closed properly last time,
+ // while calling FileSystem.append for this file, FSNamesystem.startFileInternal->
+ // FSNamesystem.recoverLeaseInternal function gets called.
+ // This function throws AlreadyBeingCreatedException if there is an open handle, to any other file,
+ // created using the same FileSystem object. This is a bug and is being tracked at:
+ // https://issues.apache.org/jira/browse/HDFS-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
+ //
+ // The fix for this bug is not yet part of Pivotal HD. So to overcome the bug,
+ // we create a new file system for the timer task so that it does not encounter the bug.
+
+ FileStatus tmpHoplogs[] = FSUtils.listStatus(fs, fs.makeQualified(bucketPath), new PathFilter() {
+ @Override
+ public boolean accept(Path file) {
+ // All valid hoplog files must match the regex
+ Matcher matcher = patternForTmpHoplog.matcher(file.getName());
+ return matcher.matches();
+ }
+ });
+
+ if (tmpHoplogs == null || tmpHoplogs.length == 0) {
+ if (logger.isDebugEnabled())
+ logger.debug("{}No files to fix", logPrefix);
+ return;
+ }
+ // ping secondaries so that in case of split brain, no other vm has taken up
+ // as primary. #50110.
+ pingSecondaries();
+ if (logger.isDebugEnabled())
+ logger.debug("{}Files to fix " + tmpHoplogs.length, logPrefix);
+
+ String currentHoplogName = null;
+ // get the current hoplog name. We need to ignore current hoplog while fixing.
+ if (currentHoplog != null) {
+ currentHoplogName = currentHoplog.getFileName();
+ }
+
+ for (int i = 0; i < tmpHoplogs.length; i++) {
+ // Skip directories
+ if (tmpHoplogs[i].isDirectory()) {
+ continue;
+ }
+
+ final Path p = tmpHoplogs[i].getPath();
+
+ if (tmpHoplogs[i].getPath().getName().equals(currentHoplogName)){
+ if (logger.isDebugEnabled())
+ logger.debug("Skipping current file: " + tmpHoplogs[i].getPath().getName(), logPrefix);
+ continue;
+ }
+
+ SequenceFileHoplog hoplog = new SequenceFileHoplog(fs, p, stats);
+ try {
+ makeLegitimate(hoplog);
+ logger.info (LocalizedMessage.create(LocalizedStrings.DEBUG, "Hoplog " + p + " was a temporary " +
+ "hoplog because the node managing it wasn't shutdown properly last time. Fixed the hoplog name."));
+ } catch (IOException e) {
+ logger.info (LocalizedMessage.create(LocalizedStrings.DEBUG, "Hoplog " + p + " is still a temporary " +
+ "hoplog because the node managing it wasn't shutdown properly last time. Failed to " +
+ "change the hoplog name because an exception was thrown while fixing it. " + e));
+ }
+ }
+ }
+
+ private FileStatus[] getExpiredHoplogs() throws IOException {
+ FileStatus files[] = FSUtils.listStatus(fileSystem, bucketPath, new PathFilter() {
+ @Override
+ public boolean accept(Path file) {
+ // All expired hoplog end with expire extension and must match the valid file regex
+ String fileName = file.getName();
+ if (! fileName.endsWith(EXPIRED_HOPLOG_EXTENSION)) {
+ return false;
+ }
+ return true;
+ }
+ });
+ return files;
+ }
+ /**
+ * locks sorted oplogs collection, removes oplog and renames for deletion later
+ * @throws IOException
+ */
+ private void markHoplogsForDeletion() throws IOException {
+
+ ArrayList<IOException> errors = new ArrayList<IOException>();
+ FileStatus validHoplogs[] = FSUtils.listStatus(fileSystem, bucketPath, new PathFilter() {
+ @Override
+ public boolean accept(Path file) {
+ // All valid hoplog files must match the regex
+ Matcher matcher = HOPLOG_PATTERN.matcher(file.getName());
+ return matcher.matches();
+ }
+ });
+
+ FileStatus[] expired = getExpiredHoplogs();
+ validHoplogs = filterValidHoplogs(validHoplogs, expired);
+
+ if (validHoplogs == null || validHoplogs.length == 0) {
+ return;
+ }
+ for (FileStatus fileStatus : validHoplogs) {
+ try {
+ addExpiryMarkerForAFile(getHoplog(fileStatus.getPath()));
+ } catch (IOException e) {
+ // even if there is an IO error continue removing other hoplogs and
+ // notify at the end
+ errors.add(e);
+ }
+ }
+
+ if (!errors.isEmpty()) {
+ for (IOException e : errors) {
+ logger.warn(LocalizedStrings.HOPLOG_HOPLOG_REMOVE_FAILED, e);
+ }
+ }
+ }
+
+ @Override
+ public Compactor getCompactor() {
+ throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName());
+ }
+
+ @Override
+ public HoplogIterator<byte[], UnsortedHoplogPersistedEvent> scan(
+ long startOffset, long length) throws IOException {
+ throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName());
+ }
+
+ public long getLastFlushTime() {
+ return this.lastFlushTime;
+ }
+
+ public long getfileRolloverInterval(){
+ int fileRolloverInterval = this.store.getWriteOnlyFileRolloverInterval();
+ return fileRolloverInterval;
+ }
+
+ @Override
+ public long getLastMajorCompactionTimestamp() {
+ throw new UnsupportedOperationException();
+ }
+
+}