You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/05/04 22:57:12 UTC
[12/63] [abbrv] incubator-geode git commit: GEODE-1072: Removing HDFS
related code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java
deleted file mode 100644
index cdf7452..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
-import com.gemstone.gemfire.cache.execute.Function;
-import com.gemstone.gemfire.cache.execute.FunctionContext;
-import com.gemstone.gemfire.cache.execute.FunctionException;
-import com.gemstone.gemfire.cache.execute.FunctionService;
-import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
-import com.gemstone.gemfire.cache.hdfs.internal.FlushObserver.AsyncFlushResult;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
-import com.gemstone.gemfire.distributed.DistributedMember;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
-import com.gemstone.gemfire.i18n.LogWriterI18n;
-import com.gemstone.gemfire.internal.InternalEntity;
-import com.gemstone.gemfire.internal.cache.ForceReattemptException;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
-import com.gemstone.gemfire.internal.cache.execute.LocalResultCollector;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
-import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-
-public class HDFSFlushQueueFunction implements Function, InternalEntity{
- private static final int MAX_RETRIES = Integer.getInteger("gemfireXD.maxFlushQueueRetries", 3);
- private static final boolean VERBOSE = Boolean.getBoolean("hdfsFlushQueueFunction.VERBOSE");
- private static final Logger logger = LogService.getLogger();
- private static final String ID = HDFSFlushQueueFunction.class.getName();
-
- public static void flushQueue(PartitionedRegion pr, int maxWaitTime) {
-
- Set<Integer> buckets = new HashSet<Integer>(pr.getRegionAdvisor().getBucketSet());
-
- maxWaitTime *= 1000;
- long start = System.currentTimeMillis();
-
- int retries = 0;
- long remaining = 0;
- while (retries++ < MAX_RETRIES && (remaining = waitTime(start, maxWaitTime)) > 0) {
- if (logger.isDebugEnabled() || VERBOSE) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flushing buckets " + buckets
- + ", attempt = " + retries
- + ", remaining = " + remaining));
- }
-
- HDFSFlushQueueArgs args = new HDFSFlushQueueArgs(buckets, remaining);
-
- HDFSFlushQueueResultCollector rc = new HDFSFlushQueueResultCollector(buckets);
- AbstractExecution exec = (AbstractExecution) FunctionService
- .onRegion(pr)
- .withArgs(args)
- .withCollector(rc);
- exec.setWaitOnExceptionFlag(true);
-
- try {
- exec.execute(ID);
- if (rc.getResult()) {
- if (logger.isDebugEnabled() || VERBOSE) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flushed all buckets successfully"));
- }
- return;
- }
- } catch (FunctionException e) {
- if (logger.isDebugEnabled() || VERBOSE) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Encountered error flushing queue"), e);
- }
- }
-
- buckets.removeAll(rc.getSuccessfulBuckets());
- for (int bucketId : buckets) {
- remaining = waitTime(start, maxWaitTime);
- if (logger.isDebugEnabled() || VERBOSE) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Waiting for bucket " + bucketId));
- }
- pr.getNodeForBucketWrite(bucketId, new PartitionedRegion.RetryTimeKeeper((int) remaining));
- }
- }
-
- pr.checkReadiness();
- throw new FunctionException("Unable to flush the following buckets: " + buckets);
- }
-
- private static long waitTime(long start, long max) {
- if (max == 0) {
- return Integer.MAX_VALUE;
- }
- return start + max - System.currentTimeMillis();
- }
-
- @Override
- public void execute(FunctionContext context) {
- RegionFunctionContext rfc = (RegionFunctionContext) context;
- PartitionedRegion pr = (PartitionedRegion) rfc.getDataSet();
-
- HDFSFlushQueueArgs args = (HDFSFlushQueueArgs) rfc.getArguments();
- Set<Integer> buckets = new HashSet<Integer>(args.getBuckets());
- buckets.retainAll(pr.getDataStore().getAllLocalPrimaryBucketIds());
-
- Map<Integer, AsyncFlushResult> flushes = new HashMap<Integer, AsyncFlushResult>();
- for (int bucketId : buckets) {
- try {
- HDFSBucketRegionQueue brq = getQueue(pr, bucketId);
- if (brq != null) {
- if (logger.isDebugEnabled() || VERBOSE) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flushing bucket " + bucketId));
- }
- flushes.put(bucketId, brq.flush());
- }
- } catch (ForceReattemptException e) {
- if (logger.isDebugEnabled() || VERBOSE) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Encountered error flushing bucket " + bucketId), e);
- }
- }
- }
-
- try {
- long start = System.currentTimeMillis();
- for (Map.Entry<Integer, AsyncFlushResult> flush : flushes.entrySet()) {
- long remaining = waitTime(start, args.getMaxWaitTime());
- if (logger.isDebugEnabled() || VERBOSE) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Waiting for bucket " + flush.getKey()
- + " to complete flushing, remaining = " + remaining));
- }
-
- if (flush.getValue().waitForFlush(remaining, TimeUnit.MILLISECONDS)) {
- if (logger.isDebugEnabled() || VERBOSE) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Bucket " + flush.getKey() + " flushed successfully"));
- }
- rfc.getResultSender().sendResult(new FlushStatus(flush.getKey()));
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
-
- if (logger.isDebugEnabled() || VERBOSE) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Sending final flush result"));
- }
- rfc.getResultSender().lastResult(FlushStatus.last());
- }
-
- private HDFSBucketRegionQueue getQueue(PartitionedRegion pr, int bucketId)
- throws ForceReattemptException {
- AsyncEventQueueImpl aeq = pr.getHDFSEventQueue();
- AbstractGatewaySender gw = (AbstractGatewaySender) aeq.getSender();
- AbstractGatewaySenderEventProcessor ep = gw.getEventProcessor();
- if (ep == null) {
- return null;
- }
-
- ConcurrentParallelGatewaySenderQueue queue = (ConcurrentParallelGatewaySenderQueue) ep.getQueue();
- return queue.getBucketRegionQueue(pr, bucketId);
- }
-
- @Override
- public String getId() {
- return ID;
- }
-
- @Override
- public boolean hasResult() {
- return true;
- }
-
- @Override
- public boolean optimizeForWrite() {
- return true;
- }
-
- @Override
- public boolean isHA() {
- return false;
- }
-
- public static class HDFSFlushQueueResultCollector implements LocalResultCollector<Object, Boolean> {
- private final CountDownLatch complete;
- private final Set<Integer> expectedBuckets;
- private final Set<Integer> successfulBuckets;
-
- private volatile ReplyProcessor21 processor;
-
- public HDFSFlushQueueResultCollector(Set<Integer> expectedBuckets) {
- this.expectedBuckets = expectedBuckets;
-
- complete = new CountDownLatch(1);
- successfulBuckets = new HashSet<Integer>();
- }
-
- public Set<Integer> getSuccessfulBuckets() {
- synchronized (successfulBuckets) {
- return new HashSet<Integer>(successfulBuckets);
- }
- }
-
- @Override
- public Boolean getResult() throws FunctionException {
- try {
- complete.await();
- synchronized (successfulBuckets) {
- LogWriterI18n logger = InternalDistributedSystem.getLoggerI18n();
- if (logger.fineEnabled() || VERBOSE) {
- logger.info(LocalizedStrings.DEBUG, "Expected buckets: " + expectedBuckets);
- logger.info(LocalizedStrings.DEBUG, "Successful buckets: " + successfulBuckets);
- }
- return expectedBuckets.equals(successfulBuckets);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- GemFireCacheImpl.getExisting().getCancelCriterion().checkCancelInProgress(e);
- throw new FunctionException(e);
- }
- }
-
- @Override
- public Boolean getResult(long timeout, TimeUnit unit)
- throws FunctionException, InterruptedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public synchronized void addResult(DistributedMember memberID, Object result) {
- if (result instanceof FlushStatus) {
- FlushStatus status = (FlushStatus) result;
- if (!status.isLast()) {
- synchronized (successfulBuckets) {
- successfulBuckets.add(status.getBucketId());
- }
- }
- }
- }
-
- @Override
- public void endResults() {
- complete.countDown();
- }
-
- @Override
- public void clearResults() {
- }
-
- @Override
- public void setProcessor(ReplyProcessor21 processor) {
- this.processor = processor;
- }
-
- @Override
- public ReplyProcessor21 getProcessor() {
- return processor;
- }
-
- @Override
- public void setException(Throwable exception) {
- // TODO Auto-generated method stub
-
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java
deleted file mode 100644
index ec0f9ff..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.internal.VersionedDataSerializable;
-import com.gemstone.gemfire.internal.Version;
-
-/**
- * Arguments passed to the HDFSForceCompactionFunction
- *
- */
-@SuppressWarnings("serial")
-public class HDFSForceCompactionArgs implements VersionedDataSerializable {
-
- private static Version[] serializationVersions = new Version[]{ Version.GFE_81 };
-
- private HashSet<Integer> buckets;
-
- private boolean isMajor;
-
- private int maxWaitTime;
-
- public HDFSForceCompactionArgs() {
- }
-
- public HDFSForceCompactionArgs(Set<Integer> buckets, boolean isMajor, Integer maxWaitTime) {
- this.buckets = new HashSet<Integer>(buckets);
- this.isMajor = isMajor;
- this.maxWaitTime = maxWaitTime;
- }
-
- @Override
- public void toData(DataOutput out) throws IOException {
- DataSerializer.writeHashSet(buckets, out);
- out.writeBoolean(isMajor);
- out.writeInt(maxWaitTime);
- }
-
- @Override
- public void fromData(DataInput in) throws IOException,
- ClassNotFoundException {
- this.buckets = DataSerializer.readHashSet(in);
- this.isMajor = in.readBoolean();
- this.maxWaitTime = in.readInt();
- }
-
- @Override
- public Version[] getSerializationVersions() {
- return serializationVersions;
- }
-
- public Set<Integer> getBuckets() {
- return (Set<Integer>) buckets;
- }
-
- public void setBuckets(Set<Integer> buckets) {
- this.buckets = new HashSet<Integer>(buckets);
- }
-
- public boolean isMajor() {
- return isMajor;
- }
-
- public void setMajor(boolean isMajor) {
- this.isMajor = isMajor;
- }
-
- public boolean isSynchronous() {
- return maxWaitTime == 0;
- }
-
- public int getMaxWaitTime() {
- return this.maxWaitTime;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(getClass().getCanonicalName()).append("@")
- .append(System.identityHashCode(this))
- .append(" buckets:").append(buckets)
- .append(" isMajor:").append(isMajor)
- .append(" maxWaitTime:").append(maxWaitTime);
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java
deleted file mode 100644
index d26ac1b..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.cache.execute.Function;
-import com.gemstone.gemfire.cache.execute.FunctionContext;
-import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
-import com.gemstone.gemfire.internal.InternalEntity;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-/**
- * Function responsible for forcing a compaction on all members
- * of the system
- *
- */
-@SuppressWarnings("serial")
-public class HDFSForceCompactionFunction implements Function, InternalEntity {
-
- public static final int FORCE_COMPACTION_MAX_RETRIES = Integer.getInteger("gemfireXD.maxCompactionRetries", 3);
-
- public static final int BUCKET_ID_FOR_LAST_RESULT = -1;
-
- public static final String ID = "HDFSForceCompactionFunction";
-
- private static final Logger logger = LogService.getLogger();
-
- @Override
- public void execute(FunctionContext context) {
- if (context.isPossibleDuplicate()) {
- // do not re-execute the function, another function
- // targeting the failed buckets will be invoked
- context.getResultSender().lastResult(new CompactionStatus(BUCKET_ID_FOR_LAST_RESULT, false));
- return;
- }
- RegionFunctionContext rfc = (RegionFunctionContext) context;
- PartitionedRegion pr = (PartitionedRegion) rfc.getDataSet();
- HDFSForceCompactionArgs args = (HDFSForceCompactionArgs) rfc.getArguments();
- Set<Integer> buckets = new HashSet<Integer>(args.getBuckets()); // copying avoids race when the function coordinator
- // also runs the function locally
- buckets.retainAll(pr.getDataStore().getAllLocalPrimaryBucketIds());
-
- List<Future<CompactionStatus>> futures = pr.forceLocalHDFSCompaction(buckets, args.isMajor(), 0);
- int waitFor = args.getMaxWaitTime();
- for (Future<CompactionStatus> future : futures) {
- long start = System.currentTimeMillis();
- CompactionStatus status = null;
- try {
- // TODO use a CompletionService instead
- if (!args.isSynchronous() && waitFor <= 0) {
- break;
- }
- status = args.isSynchronous() ? future.get() : future.get(waitFor, TimeUnit.MILLISECONDS);
- buckets.remove(status.getBucketId());
- if (logger.isDebugEnabled()) {
- logger.debug("HDFS: ForceCompaction sending result:"+status);
- }
- context.getResultSender().sendResult(status);
- long elapsedTime = System.currentTimeMillis() - start;
- waitFor -= elapsedTime;
- } catch (InterruptedException e) {
- // send a list of failed buckets after waiting for all buckets
- } catch (ExecutionException e) {
- // send a list of failed buckets after waiting for all buckets
- } catch (TimeoutException e) {
- // do not wait for other buckets to complete
- break;
- }
- }
- // for asynchronous invocation, the status is true for buckets that we did not wait for
- boolean status = args.isSynchronous() ? false : true;
- for (Integer bucketId : buckets) {
- if (logger.isDebugEnabled()) {
- logger.debug("HDFS: ForceCompaction sending result for bucket:"+bucketId);
- }
- context.getResultSender().sendResult(new CompactionStatus(bucketId, status));
- }
- if (logger.isDebugEnabled()) {
- logger.debug("HDFS: ForceCompaction sending last result");
- }
- context.getResultSender().lastResult(new CompactionStatus(BUCKET_ID_FOR_LAST_RESULT, true));
- }
-
- @Override
- public String getId() {
- return ID;
- }
-
- @Override
- public boolean hasResult() {
- return true;
- }
-
- @Override
- public boolean optimizeForWrite() {
- // run compaction on primary members
- return true;
- }
-
- @Override
- public boolean isHA() {
- // so that we can target re-execution on failed buckets
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java
deleted file mode 100644
index ee5e4aa..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import com.gemstone.gemfire.cache.execute.FunctionException;
-import com.gemstone.gemfire.distributed.DistributedMember;
-import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.execute.LocalResultCollector;
-
-/**
- *
- */
-public class HDFSForceCompactionResultCollector implements LocalResultCollector<Object, List<CompactionStatus>> {
-
- /** list of received replies*/
- private List<CompactionStatus> reply = new ArrayList<CompactionStatus>();
-
- /** semaphore to block the caller of getResult()*/
- private CountDownLatch waitForResults = new CountDownLatch(1);
-
- /** boolean to indicate if clearResults() was called to indicate a failure*/
- private volatile boolean shouldRetry;
-
- private ReplyProcessor21 processor;
-
- @Override
- public List<CompactionStatus> getResult() throws FunctionException {
- try {
- waitForResults.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- GemFireCacheImpl.getExisting().getCancelCriterion().checkCancelInProgress(e);
- throw new FunctionException(e);
- }
- return reply;
- }
-
- @Override
- public List<CompactionStatus> getResult(long timeout, TimeUnit unit)
- throws FunctionException, InterruptedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void addResult(DistributedMember memberID,
- Object resultOfSingleExecution) {
- if (resultOfSingleExecution instanceof CompactionStatus) {
- CompactionStatus status = (CompactionStatus) resultOfSingleExecution;
- if (status.getBucketId() != HDFSForceCompactionFunction.BUCKET_ID_FOR_LAST_RESULT) {
- reply.add(status);
- }
- }
- }
-
- @Override
- public void endResults() {
- waitForResults.countDown();
- }
-
- @Override
- public void clearResults() {
- this.shouldRetry = true;
- waitForResults.countDown();
- }
-
- /**
- * @return true if retry should be attempted
- */
- public boolean shouldRetry() {
- return this.shouldRetry || !getFailedBucketIds().isEmpty();
- }
-
- private Set<Integer> getFailedBucketIds() {
- Set<Integer> result = new HashSet<Integer>();
- for (CompactionStatus status : reply) {
- if (!status.isStatus()) {
- result.add(status.getBucketId());
- }
- }
- return result;
- }
-
- public Set<Integer> getSuccessfulBucketIds() {
- Set<Integer> result = new HashSet<Integer>();
- for (CompactionStatus status : reply) {
- if (status.isStatus()) {
- result.add(status.getBucketId());
- }
- }
- return result;
- }
-
- @Override
- public void setProcessor(ReplyProcessor21 processor) {
- this.processor = processor;
- }
-
- @Override
- public ReplyProcessor21 getProcessor() {
- return this.processor;
- }
-
-@Override
-public void setException(Throwable exception) {
- // TODO Auto-generated method stub
-
-}
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java
deleted file mode 100644
index 789fe4d..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import com.gemstone.gemfire.cache.execute.FunctionAdapter;
-import com.gemstone.gemfire.cache.execute.FunctionContext;
-import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
-import com.gemstone.gemfire.internal.InternalEntity;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-
-/**
- * Function that returns the oldest timestamp among all the major
- * compacted buckets on the members
- *
- */
-@SuppressWarnings("serial")
-public class HDFSLastCompactionTimeFunction extends FunctionAdapter implements InternalEntity{
-
- public static final String ID = "HDFSLastCompactionTimeFunction";
-
- @Override
- public void execute(FunctionContext context) {
- RegionFunctionContext rfc = (RegionFunctionContext) context;
- PartitionedRegion pr = (PartitionedRegion) rfc.getDataSet();
- rfc.getResultSender().lastResult(pr.lastLocalMajorHDFSCompaction());
- }
-
- @Override
- public String getId() {
- return ID;
- }
-
- @Override
- public boolean isHA() {
- return true;
- }
-
- @Override
- public boolean optimizeForWrite() {
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java
deleted file mode 100644
index 6d70dce..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java
+++ /dev/null
@@ -1,480 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.gemstone.gemfire.StatisticsFactory;
-import com.gemstone.gemfire.cache.GemFireCache;
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.i18n.LogWriterI18n;
-import com.gemstone.gemfire.internal.SystemTimer;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-
-import org.apache.logging.log4j.Logger;
-
-/**
- * Cache for hoplog organizers associated with buckets of a region. The director creates an
- * instance of organizer on first get request. It does not read HDFS in advance. Creation of
- * organizer depends on File system initialization that takes outside this class. This class also
- * provides utility methods to monitor usage and manage bucket sets.
- *
- */
-public class HDFSRegionDirector {
- /*
- * Maps each region name to its listener and store objects. This map must be populated before file
- * organizers of a bucket can be created
- */
- private final ConcurrentHashMap<String, HdfsRegionManager> regionManagerMap;
-
- /**
- * regions of this Gemfire cache are managed by this director. TODO this
- * should be final and be provided at the time of creation of this instance or
- * through a cache directory
- */
- private GemFireCache cache;
-
- // singleton instance
- private static HDFSRegionDirector instance;
-
- final ScheduledExecutorService janitor;
- private JanitorTask janitorTask;
-
- private static final Logger logger = LogService.getLogger();
- protected final static String logPrefix = "<" + "RegionDirector" + "> ";
-
-
- private HDFSRegionDirector() {
- regionManagerMap = new ConcurrentHashMap<String, HDFSRegionDirector.HdfsRegionManager>();
- janitor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r, "HDFSRegionJanitor");
- thread.setDaemon(true);
- return thread;
- }
- });
-
- long interval = Long.getLong(HoplogConfig.JANITOR_INTERVAL_SECS,
- HoplogConfig.JANITOR_INTERVAL_SECS_DEFAULT);
-
- janitorTask = new JanitorTask();
- janitor.scheduleWithFixedDelay(janitorTask, interval, interval,
- TimeUnit.SECONDS);
- }
-
- public synchronized static HDFSRegionDirector getInstance() {
- if (instance == null) {
- instance = new HDFSRegionDirector();
- }
- return instance;
- }
-
- public HDFSRegionDirector setCache(GemFireCache cache) {
- this.cache = cache;
- return this;
- }
-
- public GemFireCache getCache() {
- return this.cache;
- }
- /**
- * Caches listener, store object and list of organizers associated with the region associated with
- * a region. Subsequently, these objects will be used each time an organizer is created
- */
- public synchronized HdfsRegionManager manageRegion(LocalRegion region, String storeName,
- HoplogListener listener) {
-
- HdfsRegionManager manager = regionManagerMap.get(region.getFullPath());
- if (manager != null) {
- // this is an attempt to re-register a region. Assuming this was required
- // to modify listener or hdfs store impl associated with the region. Hence
- // will clear the region first.
-
- clear(region.getFullPath());
- }
-
- HDFSStoreImpl store = HDFSStoreDirector.getInstance().getHDFSStore(storeName);
- manager = new HdfsRegionManager(region, store, listener, getStatsFactory(), this);
- regionManagerMap.put(region.getFullPath(), manager);
-
- if (logger.isDebugEnabled()) {
- logger.debug("{}Now managing region " + region.getFullPath(), logPrefix);
- }
-
- return manager;
- }
-
- /**
- * Find the regions that are part of a particular HDFS store.
- */
- public Collection<String> getRegionsInStore(HDFSStore store) {
- TreeSet<String> regions = new TreeSet<String>();
- for(Map.Entry<String, HdfsRegionManager> entry : regionManagerMap.entrySet()) {
- if(entry.getValue().getStore().equals(store)) {
- regions.add(entry.getKey());
- }
- }
- return regions;
- }
-
- public int getBucketCount(String regionPath) {
- HdfsRegionManager manager = regionManagerMap.get(regionPath);
- if (manager == null) {
- throw new IllegalStateException("Region not initialized");
- }
-
- return manager.bucketOrganizerMap.size();
- }
-
- public void closeWritersForRegion(String regionPath, int minSizeForFileRollover) throws IOException {
- regionManagerMap.get(regionPath).closeWriters(minSizeForFileRollover);
- }
- /**
- * removes and closes all {@link HoplogOrganizer} of this region. This call is expected with
- * a PR disowns a region.
- */
- public synchronized void clear(String regionPath) {
- HdfsRegionManager manager = regionManagerMap.remove(regionPath);
- if (manager != null) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Closing hoplog region manager for " + regionPath, logPrefix);
- }
- manager.close();
- }
- }
-
- /**
- * Closes all region managers, organizers and hoplogs. This method should be
- * called before closing the cache to gracefully release all resources
- */
- public static synchronized void reset() {
- if (instance == null) {
- // nothing to reset
- return;
- }
-
- instance.janitor.shutdownNow();
-
- for (String region : instance.regionManagerMap.keySet()) {
- instance.clear(region);
- }
- instance.cache = null;
- instance = null;
- }
-
- /**
- * Terminates current janitor task and schedules a new. The rate of the new
- * task is based on the value of system property at that time
- */
- public static synchronized void resetJanitor() {
- instance.janitorTask.terminate();
- instance.janitorTask = instance.new JanitorTask();
- long interval = Long.getLong(HoplogConfig.JANITOR_INTERVAL_SECS,
- HoplogConfig.JANITOR_INTERVAL_SECS_DEFAULT);
- instance.janitor.scheduleWithFixedDelay(instance.janitorTask, 0, interval,
- TimeUnit.SECONDS);
- }
-
- /**
- * @param regionPath name of region for which stats object is desired
- * @return {@link SortedOplogStatistics} instance associated with hdfs region
- * name. Null if region is not managed by director
- */
- public synchronized SortedOplogStatistics getHdfsRegionStats(String regionPath) {
- HdfsRegionManager manager = regionManagerMap.get(regionPath);
- return manager == null ? null : manager.getHdfsStats();
- }
-
- private StatisticsFactory getStatsFactory() {
- return cache.getDistributedSystem();
- }
-
- /**
- * A helper class to manage region and its organizers
- */
- public static class HdfsRegionManager {
- // name and store configuration of the region whose buckets are managed by this director.
- private LocalRegion region;
- private HDFSStoreImpl store;
- private HoplogListener listener;
- private volatile boolean closed = false;
- private final int FILE_ROLLOVER_TASK_INTERVAL = Integer.parseInt
- (System.getProperty("gemfire.HDFSRegionDirector.FILE_ROLLOVER_TASK_INTERVAL_SECONDS", "60"));
-
- private SystemTimer hoplogCloseTimer = null;
-
- // instance of hdfs statistics object for this hdfs based region. This
- // object will collect usage and performance related statistics.
- private final SortedOplogStatistics hdfsStats;
-
- /*
- * An instance of organizer is created for each bucket of regionName region residing on this
- * node. This member maps bucket id with its corresponding organizer instance. A lock is used to
- * manage concurrent writes to the map.
- */
- private ConcurrentMap<Integer, HoplogOrganizer> bucketOrganizerMap;
-
- private HDFSRegionDirector hdfsRegionDirector;
-
- /**
- * @param listener
- * listener of change events like file creation and deletion
- * @param hdfsRegionDirector
- */
- HdfsRegionManager(LocalRegion region, HDFSStoreImpl store,
- HoplogListener listener, StatisticsFactory statsFactory, HDFSRegionDirector hdfsRegionDirector) {
- bucketOrganizerMap = new ConcurrentHashMap<Integer, HoplogOrganizer>();
- this.region = region;
- this.listener = listener;
- this.store = store;
- this.hdfsStats = new SortedOplogStatistics(statsFactory, "HDFSRegionStatistics", region.getFullPath());
- this.hdfsRegionDirector = hdfsRegionDirector;
- }
-
- public void closeWriters(int minSizeForFileRollover) throws IOException {
- final long startTime = System.currentTimeMillis();
- long elapsedTime = 0;
-
- Collection<HoplogOrganizer> organizers = bucketOrganizerMap.values();
-
- for (HoplogOrganizer organizer : organizers) {
-
- try {
- this.getRegion().checkReadiness();
- } catch (Exception e) {
- break;
- }
-
- ((HDFSUnsortedHoplogOrganizer)organizer).synchronizedCloseWriter(true, 0,
- minSizeForFileRollover);
- }
-
- }
-
- public synchronized <T extends PersistedEventImpl> HoplogOrganizer<T> create(int bucketId) throws IOException {
- assert !bucketOrganizerMap.containsKey(bucketId);
-
- HoplogOrganizer<?> organizer = region.getHDFSWriteOnly()
- ? new HDFSUnsortedHoplogOrganizer(this, bucketId)
- : new HdfsSortedOplogOrganizer(this, bucketId);
-
- bucketOrganizerMap.put(bucketId, organizer);
- // initialize a timer that periodically closes the hoplog writer if the
- // time for rollover has passed. It also has the responsibility to fix the files.
- if (this.region.getHDFSWriteOnly() &&
- hoplogCloseTimer == null) {
- hoplogCloseTimer = new SystemTimer(hdfsRegionDirector.
- getCache().getDistributedSystem(), true);
-
- // schedule the task to fix the files that were not closed properly
- // last time.
- hoplogCloseTimer.scheduleAtFixedRate(new CloseTmpHoplogsTimerTask(this),
- 1000, FILE_ROLLOVER_TASK_INTERVAL * 1000);
-
- if (logger.isDebugEnabled()) {
- logger.debug("{}Schedulng hoplog rollover timer with interval "+ FILE_ROLLOVER_TASK_INTERVAL +
- " for hoplog organizer for " + region.getFullPath()
- + ":" + bucketId + " " + organizer, logPrefix);
- }
- }
-
- if (logger.isDebugEnabled()) {
- logger.debug("{}Constructed hoplog organizer for " + region.getFullPath()
- + ":" + bucketId + " " + organizer, logPrefix);
- }
- return (HoplogOrganizer<T>) organizer;
- }
-
- public synchronized <T extends PersistedEventImpl> void addOrganizer(
- int bucketId, HoplogOrganizer<T> organizer) {
- if (bucketOrganizerMap.containsKey(bucketId)) {
- throw new IllegalArgumentException();
- }
- if (logger.isDebugEnabled()) {
- logger.debug("{}added pre constructed organizer " + region.getFullPath()
- + ":" + bucketId + " " + organizer, logPrefix);
- }
- bucketOrganizerMap.put(bucketId, organizer);
- }
-
- public void close() {
- closed = true;
-
- if (this.region.getHDFSWriteOnly() &&
- hoplogCloseTimer != null) {
- hoplogCloseTimer.cancel();
- hoplogCloseTimer = null;
- }
- for (int bucket : bucketOrganizerMap.keySet()) {
- close(bucket);
- }
- }
-
- public boolean isClosed() {
- return closed;
- }
-
- public synchronized void close(int bucketId) {
- try {
- HoplogOrganizer organizer = bucketOrganizerMap.remove(bucketId);
- if (organizer != null) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Closing hoplog organizer for " + region.getFullPath() + ":" +
- bucketId + " " + organizer, logPrefix);
- }
- organizer.close();
- }
- } catch (IOException e) {
- if (logger.isDebugEnabled()) {
- logger.debug(logPrefix + "Error closing hoplog organizer for " + region.getFullPath() + ":" + bucketId, e);
- }
- }
- //TODO abort compaction and flush requests for this region
- }
-
- public static String getRegionFolder(String regionPath) {
- String folder = regionPath;
- //Change any underscore into a double underscore
- folder = folder.replace("_", "__");
- //get rid of the leading slash
- folder = folder.replaceFirst("^/", "");
- //replace slashes with underscores
- folder = folder.replace('/', '_');
- return folder;
- }
-
- public String getRegionFolder() {
- return getRegionFolder(region.getFullPath());
- }
-
- public HoplogListener getListener() {
- return listener;
- }
-
- public HDFSStoreImpl getStore() {
- return store;
- }
-
- public LocalRegion getRegion() {
- return region;
- }
-
- public SortedOplogStatistics getHdfsStats() {
- return hdfsStats;
- }
-
- public Collection<HoplogOrganizer> getBucketOrganizers(){
- return this.bucketOrganizerMap.values();
- }
-
- /**
- * get the HoplogOrganizers only for the given set of buckets
- */
- public Collection<HoplogOrganizer> getBucketOrganizers(Set<Integer> buckets){
- Set<HoplogOrganizer> result = new HashSet<HoplogOrganizer>();
- for (Integer bucketId : buckets) {
- result.add(this.bucketOrganizerMap.get(bucketId));
- }
- return result;
- }
-
- /**
- * Delete all files from HDFS for this region. This method
- * should be called after all members have destroyed their
- * region in gemfire, so there should be no threads accessing
- * these files.
- * @throws IOException
- */
- public void destroyData() throws IOException {
- //Make sure everything is shut down and closed.
- close();
- if (store == null) {
- return;
- }
- Path regionPath = new Path(store.getHomeDir(), getRegionFolder());
-
- //Delete all files in HDFS.
- FileSystem fs = getStore().getFileSystem();
- if(!fs.delete(regionPath, true)) {
- if(fs.exists(regionPath)) {
- throw new IOException("Unable to delete " + regionPath);
- }
- }
- }
-
- public void performMaintenance() throws IOException {
- Collection<HoplogOrganizer> buckets = getBucketOrganizers();
- for (HoplogOrganizer bucket : buckets) {
- bucket.performMaintenance();
- }
- }
- }
-
- private class JanitorTask implements Runnable {
- boolean terminated = false;
- @Override
- public void run() {
- if (terminated) {
- return;
- }
- fineLog("Executing HDFS Region janitor task", null);
-
- Collection<HdfsRegionManager> regions = regionManagerMap.values();
- for (HdfsRegionManager region : regions) {
- fineLog("Maintaining region:" + region.getRegionFolder(), null);
- try {
- region.performMaintenance();
- } catch (Throwable e) {
- logger.info(LocalizedMessage.create(LocalizedStrings.HOPLOG_IO_ERROR , region.getRegionFolder()));
- logger.info(LocalizedMessage.create(LocalizedStrings.ONE_ARG, e.getMessage()));
- fineLog(null, e);
- }
- }
- }
-
- public void terminate() {
- terminated = true;
- }
- }
-
- protected static void fineLog(String message, Throwable e) {
- if(logger.isDebugEnabled()) {
- logger.debug(message, e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java
deleted file mode 100644
index 880ef3e..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
-
-/**
- * HDFSStoreDirector is created for managing all instances of HDFSStoreImpl.
- *
- */
-public final class HDFSStoreDirector {
- private final ConcurrentHashMap<String, HDFSStoreImpl> storeMap = new ConcurrentHashMap<String, HDFSStoreImpl>();
-
- // singleton instance
- private static volatile HDFSStoreDirector instance;
-
- private HDFSStoreDirector() {
-
- }
-
- public static final HDFSStoreDirector getInstance() {
- if (instance == null) {
- synchronized (HDFSStoreDirector.class) {
- if (instance == null)
- instance = new HDFSStoreDirector();
- }
- }
- return instance;
- }
-
- // Called when the region is created.
- public final void addHDFSStore(HDFSStoreImpl hdfsStore){
- this.storeMap.put(hdfsStore.getName(), hdfsStore);
- }
-
- public final HDFSStoreImpl getHDFSStore(String hdfsStoreName) {
- return this.storeMap.get(hdfsStoreName);
- }
-
- public final void removeHDFSStore(String hdfsStoreName) {
- this.storeMap.remove(hdfsStoreName);
- }
-
- public void closeHDFSStores() {
- Iterator<HDFSStoreImpl> it = this.storeMap.values().iterator();
- while (it.hasNext()) {
- HDFSStoreImpl hsi = it.next();
- hsi.close();
- }
- this.storeMap.clear();
- }
-
- public ArrayList<HDFSStoreImpl> getAllHDFSStores() {
- ArrayList<HDFSStoreImpl> hdfsStores = new ArrayList<HDFSStoreImpl>();
- hdfsStores.addAll(this.storeMap.values());
- return hdfsStores;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java
deleted file mode 100644
index cbb35cb..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java
+++ /dev/null
@@ -1,447 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-
-import com.gemstone.gemfire.cache.CacheClosedException;
-import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
-import com.gemstone.gemfire.cache.hdfs.internal.UnsortedHoplogPersistedEvent;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter;
-import com.gemstone.gemfire.internal.HeapDataOutputStream;
-import com.gemstone.gemfire.internal.cache.ForceReattemptException;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-import org.apache.hadoop.hbase.util.FSUtils;
-
-/**
- * Manages unsorted Hoplog files for a bucket (Streaming Ingest option). An instance per bucket
- * will exist in each PR
- *
- *
- */
-public class HDFSUnsortedHoplogOrganizer extends AbstractHoplogOrganizer<UnsortedHoplogPersistedEvent> {
- public static final String HOPLOG_REGEX = HOPLOG_NAME_REGEX + "("
- + SEQ_HOPLOG_EXTENSION + "|" + TEMP_HOPLOG_EXTENSION + ")";
- public static final Pattern HOPLOG_PATTERN = Pattern.compile(HOPLOG_REGEX);
- protected static String TMP_FILE_NAME_REGEX = HOPLOG_NAME_REGEX + SEQ_HOPLOG_EXTENSION + TEMP_HOPLOG_EXTENSION + "$";
- protected static final Pattern patternForTmpHoplog = Pattern.compile(TMP_FILE_NAME_REGEX);
-
- volatile private HoplogWriter writer;
- volatile private Hoplog currentHoplog;
-
- volatile private long lastFlushTime = System.currentTimeMillis();
-
- volatile private boolean abortFlush = false;
- private FileSystem fileSystem;
-
- public HDFSUnsortedHoplogOrganizer(HdfsRegionManager region, int bucketId) throws IOException{
- super(region, bucketId);
- writer = null;
- sequence = new AtomicInteger(0);
-
- fileSystem = store.getFileSystem();
- if (! fileSystem.exists(bucketPath)) {
- return;
- }
-
- FileStatus validHoplogs[] = FSUtils.listStatus(fileSystem, bucketPath, new PathFilter() {
- @Override
- public boolean accept(Path file) {
- // All valid hoplog files must match the regex
- Matcher matcher = HOPLOG_PATTERN.matcher(file.getName());
- return matcher.matches();
- }
- });
-
- if (validHoplogs != null && validHoplogs.length > 0) {
- for (FileStatus file : validHoplogs) {
- // account for the disk used by this file
- incrementDiskUsage(file.getLen());
- }
- }
-
- }
-
- @Override
- public void close() throws IOException {
- super.close();
- if (logger.isDebugEnabled())
- logger.debug("{}Closing the hoplog organizer and the open files", logPrefix);
- // abort the flush so that we can immediately call the close current writer.
- abortFlush = true;
- synchronizedCloseWriter(true, 0, 0);
- }
-
-
- /**
- * Flushes the data to HDFS.
- * Synchronization ensures that the writer is not closed when flush is happening.
- * To abort the flush, abortFlush needs to be set.
- * @throws ForceReattemptException
- */
- @Override
- public synchronized void flush(Iterator<? extends QueuedPersistentEvent> bufferIter, final int count)
- throws IOException, ForceReattemptException {
- assert bufferIter != null;
-
- if (abortFlush)
- throw new CacheClosedException("Either the region has been cleared " +
- "or closed. Aborting the ongoing flush operation.");
- if (logger.isDebugEnabled())
- logger.debug("{}Initializing flush operation", logPrefix);
-
- // variables for updating stats
- long start = stats.getFlush().begin();
- int byteCount = 0;
- if (writer == null) {
- // Hoplogs of sequence files are always created with a 0 sequence number
- currentHoplog = getTmpSortedOplog(0, SEQ_HOPLOG_EXTENSION);
- try {
- writer = this.store.getSingletonWriter().runSerially(new Callable<Hoplog.HoplogWriter>() {
- @Override
- public HoplogWriter call() throws Exception {
- return currentHoplog.createWriter(count);
- }
- });
- } catch (Exception e) {
- if (e instanceof IOException) {
- throw (IOException)e;
- }
- throw new IOException(e);
- }
- }
- long timeSinceLastFlush = (System.currentTimeMillis() - lastFlushTime)/1000 ;
-
- try {
- /**MergeGemXDHDFSToGFE changed the following statement as the code of HeapDataOutputStream is not merged */
- //HeapDataOutputStream out = new HeapDataOutputStream();
- while (bufferIter.hasNext()) {
- HeapDataOutputStream out = new HeapDataOutputStream(1024, null);
- if (abortFlush) {
- stats.getFlush().end(byteCount, start);
- throw new CacheClosedException("Either the region has been cleared " +
- "or closed. Aborting the ongoing flush operation.");
- }
- QueuedPersistentEvent item = bufferIter.next();
- item.toHoplogEventBytes(out);
- byte[] valueBytes = out.toByteArray();
- writer.append(item.getRawKey(), valueBytes);
- // add key length and value length to stats byte counter
- byteCount += (item.getRawKey().length + valueBytes.length);
- /**MergeGemXDHDFSToGFE how to clear for reuse. Leaving it for Darrel to merge this change*/
- //out.clearForReuse();
- }
- // ping secondaries before making the file a legitimate file to ensure
- // that in case of split brain, no other vm has taken up as primary. #50110.
- if (!abortFlush)
- pingSecondaries();
- // append completed. If the file is to be rolled over,
- // close writer and rename the file to a legitimate name.
- // Else, sync the already written data with HDFS nodes.
- int maxFileSize = this.store.getWriteOnlyFileRolloverSize() * 1024 * 1024;
- int fileRolloverInterval = this.store.getWriteOnlyFileRolloverInterval();
- if (writer.getCurrentSize() >= maxFileSize ||
- timeSinceLastFlush >= fileRolloverInterval) {
- closeCurrentWriter();
- }
- else {
- // if flush is not aborted, hsync the batch. It ensures that
- // the batch has reached HDFS and we can discard it.
- if (!abortFlush)
- writer.hsync();
- }
- } catch (IOException e) {
- stats.getFlush().error(start);
- // as there is an exception, it can be probably be a file specific problem.
- // close the current file to avoid any file specific issues next time
- closeCurrentWriter();
- // throw the exception so that async queue will dispatch the same batch again
- throw e;
- }
-
- stats.getFlush().end(byteCount, start);
- }
-
- /**
- * Synchronization ensures that the writer is not closed when flush is happening.
- */
- synchronized void synchronizedCloseWriter(boolean forceClose,
- long timeSinceLastFlush, int minsizeforrollover) throws IOException {
- long writerSize = 0;
- if (writer != null){
- writerSize = writer.getCurrentSize();
- }
-
- if (writerSize < (minsizeforrollover * 1024L))
- return;
-
- int maxFileSize = this.store.getWriteOnlyFileRolloverSize() * 1024 * 1024;
- int fileRolloverInterval = this.store.getWriteOnlyFileRolloverInterval();
- if (writerSize >= maxFileSize ||
- timeSinceLastFlush >= fileRolloverInterval || forceClose) {
- closeCurrentWriter();
- }
- }
-
-
- /**
- * Closes the current writer so that next time a new hoplog can
- * be created. Also, fixes any tmp hoplogs.
- *
- * @throws IOException
- */
- void closeCurrentWriter() throws IOException {
-
- if (writer != null) {
- // If this organizer is closing, it is ok to ignore exceptions here
- // because CloseTmpHoplogsTimerTask
- // on another member may have already renamed the hoplog
- // fixes bug 49141
- boolean isClosing = abortFlush;
- try {
- incrementDiskUsage(writer.getCurrentSize());
- } catch (IOException e) {
- if (!isClosing) {
- throw e;
- }
- }
- if (logger.isDebugEnabled())
- logger.debug("{}Closing hoplog " + currentHoplog.getFileName(), logPrefix);
- try{
- writer.close();
- makeLegitimate(currentHoplog);
- } catch (IOException e) {
- if (!isClosing) {
- logger.warn(LocalizedStrings.HOPLOG_FLUSH_OPERATION_FAILED, e);
- throw e;
- }
- } finally {
- writer = null;
- lastFlushTime = System.currentTimeMillis();
- }
- }
- else
- lastFlushTime = System.currentTimeMillis();
- }
-
- @Override
- public void clear() throws IOException {
- boolean prevAbortFlushFlag = abortFlush;
- // abort the flush so that we can immediately call the close current writer.
- abortFlush = true;
-
- // Close if there is any existing writer.
- try {
- synchronizedCloseWriter(true, 0, 0);
- } catch (IOException e) {
- logger.warn(LocalizedStrings.HOPLOG_CLOSE_FAILED, e);
- }
-
- // reenable the aborted flush
- abortFlush = prevAbortFlushFlag;
-
- // Mark the hoplogs for deletion
- markHoplogsForDeletion();
-
- }
-
- @Override
- public void performMaintenance() {
- // TODO remove the timer for tmp file conversion. Use this instead
- }
-
- @Override
- public Future<CompactionStatus> forceCompaction(boolean isMajor) {
- return null;
- }
-
- @Override
- protected Hoplog getHoplog(Path hoplogPath) throws IOException {
- Hoplog so = new SequenceFileHoplog(fileSystem, hoplogPath, stats);
- return so;
- }
-
- /**
- * Fixes the size of hoplogs that were not closed properly last time.
- * Such hoplogs are *.tmphop files. Identify them and open them and close
- * them, this fixes the size. After doing this rename them to *.hop.
- *
- * @throws IOException
- * @throws ForceReattemptException
- */
- void identifyAndFixTmpHoplogs(FileSystem fs) throws IOException, ForceReattemptException {
- if (logger.isDebugEnabled())
- logger.debug("{}Fixing temporary hoplogs", logPrefix);
-
- // A different filesystem is passed to this function for the following reason:
- // For HDFS, if a file wasn't closed properly last time,
- // while calling FileSystem.append for this file, FSNamesystem.startFileInternal->
- // FSNamesystem.recoverLeaseInternal function gets called.
- // This function throws AlreadyBeingCreatedException if there is an open handle, to any other file,
- // created using the same FileSystem object. This is a bug and is being tracked at:
- // https://issues.apache.org/jira/browse/HDFS-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
- //
- // The fix for this bug is not yet part of Pivotal HD. So to overcome the bug,
- // we create a new file system for the timer task so that it does not encounter the bug.
-
- FileStatus tmpHoplogs[] = FSUtils.listStatus(fs, fs.makeQualified(bucketPath), new PathFilter() {
- @Override
- public boolean accept(Path file) {
- // All valid hoplog files must match the regex
- Matcher matcher = patternForTmpHoplog.matcher(file.getName());
- return matcher.matches();
- }
- });
-
- if (tmpHoplogs == null || tmpHoplogs.length == 0) {
- if (logger.isDebugEnabled())
- logger.debug("{}No files to fix", logPrefix);
- return;
- }
- // ping secondaries so that in case of split brain, no other vm has taken up
- // as primary. #50110.
- pingSecondaries();
- if (logger.isDebugEnabled())
- logger.debug("{}Files to fix " + tmpHoplogs.length, logPrefix);
-
- String currentHoplogName = null;
- // get the current hoplog name. We need to ignore current hoplog while fixing.
- if (currentHoplog != null) {
- currentHoplogName = currentHoplog.getFileName();
- }
-
- for (int i = 0; i < tmpHoplogs.length; i++) {
- // Skip directories
- if (tmpHoplogs[i].isDirectory()) {
- continue;
- }
-
- final Path p = tmpHoplogs[i].getPath();
-
- if (tmpHoplogs[i].getPath().getName().equals(currentHoplogName)){
- if (logger.isDebugEnabled())
- logger.debug("Skipping current file: " + tmpHoplogs[i].getPath().getName(), logPrefix);
- continue;
- }
-
- SequenceFileHoplog hoplog = new SequenceFileHoplog(fs, p, stats);
- try {
- makeLegitimate(hoplog);
- logger.info (LocalizedMessage.create(LocalizedStrings.DEBUG, "Hoplog " + p + " was a temporary " +
- "hoplog because the node managing it wasn't shutdown properly last time. Fixed the hoplog name."));
- } catch (IOException e) {
- logger.info (LocalizedMessage.create(LocalizedStrings.DEBUG, "Hoplog " + p + " is still a temporary " +
- "hoplog because the node managing it wasn't shutdown properly last time. Failed to " +
- "change the hoplog name because an exception was thrown while fixing it. " + e));
- }
- }
- }
-
- private FileStatus[] getExpiredHoplogs() throws IOException {
- FileStatus files[] = FSUtils.listStatus(fileSystem, bucketPath, new PathFilter() {
- @Override
- public boolean accept(Path file) {
- // All expired hoplog end with expire extension and must match the valid file regex
- String fileName = file.getName();
- if (! fileName.endsWith(EXPIRED_HOPLOG_EXTENSION)) {
- return false;
- }
- return true;
- }
- });
- return files;
- }
- /**
- * locks sorted oplogs collection, removes oplog and renames for deletion later
- * @throws IOException
- */
- private void markHoplogsForDeletion() throws IOException {
-
- ArrayList<IOException> errors = new ArrayList<IOException>();
- FileStatus validHoplogs[] = FSUtils.listStatus(fileSystem, bucketPath, new PathFilter() {
- @Override
- public boolean accept(Path file) {
- // All valid hoplog files must match the regex
- Matcher matcher = HOPLOG_PATTERN.matcher(file.getName());
- return matcher.matches();
- }
- });
-
- FileStatus[] expired = getExpiredHoplogs();
- validHoplogs = filterValidHoplogs(validHoplogs, expired);
-
- if (validHoplogs == null || validHoplogs.length == 0) {
- return;
- }
- for (FileStatus fileStatus : validHoplogs) {
- try {
- addExpiryMarkerForAFile(getHoplog(fileStatus.getPath()));
- } catch (IOException e) {
- // even if there is an IO error continue removing other hoplogs and
- // notify at the end
- errors.add(e);
- }
- }
-
- if (!errors.isEmpty()) {
- for (IOException e : errors) {
- logger.warn(LocalizedStrings.HOPLOG_HOPLOG_REMOVE_FAILED, e);
- }
- }
- }
-
- @Override
- public Compactor getCompactor() {
- throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName());
- }
-
- @Override
- public HoplogIterator<byte[], UnsortedHoplogPersistedEvent> scan(
- long startOffset, long length) throws IOException {
- throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName());
- }
-
- public long getLastFlushTime() {
- return this.lastFlushTime;
- }
-
- public long getfileRolloverInterval(){
- int fileRolloverInterval = this.store.getWriteOnlyFileRolloverInterval();
- return fileRolloverInterval;
- }
-
- @Override
- public long getLastMajorCompactionTimestamp() {
- throw new UnsupportedOperationException();
- }
-
-}