You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/02/22 22:43:35 UTC
[047/100] [abbrv] [partial] incubator-geode git commit: Merge
remote-tracking branch 'origin/develop' into feature/GEODE-917
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizer.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizer.java
index 9890b3b,0000000..61b925b
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizer.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizer.java
@@@ -1,2007 -1,0 +1,2007 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
++import com.gemstone.gemfire.internal.hll.CardinalityMergeException;
++import com.gemstone.gemfire.internal.hll.HyperLogLog;
++import com.gemstone.gemfire.internal.hll.ICardinality;
++import com.gemstone.gemfire.internal.hll.MurmurHash;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.ShutdownHookManager;
+
+import com.gemstone.gemfire.InternalGemFireException;
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
- import com.gemstone.gemfire.cache.hdfs.internal.cardinality.CardinalityMergeException;
- import com.gemstone.gemfire.cache.hdfs.internal.cardinality.HyperLogLog;
- import com.gemstone.gemfire.cache.hdfs.internal.cardinality.ICardinality;
- import com.gemstone.gemfire.cache.hdfs.internal.cardinality.MurmurHash;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSCompactionManager.CompactionRequest;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogReader;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogReaderActivityListener;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.Meta;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil;
+import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
+import com.gemstone.gemfire.internal.cache.execute.BucketMovedException;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics.IOOperation;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
+import com.gemstone.gemfire.internal.concurrent.ConcurrentHashSet;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/**
+ * Manages sorted oplog files for a bucket. An instance per bucket will exist in
+ * each PR
+ *
+ * @author ashvina
+ */
+public class HdfsSortedOplogOrganizer extends AbstractHoplogOrganizer<SortedHoplogPersistedEvent> {
+ public static final int AVG_NUM_KEYS_PER_INDEX_BLOCK = 200;
+
+ // all valid sorted hoplogs will follow the following name pattern
+ public static final String SORTED_HOPLOG_REGEX = HOPLOG_NAME_REGEX + "("
+ + FLUSH_HOPLOG_EXTENSION + "|" + MINOR_HOPLOG_EXTENSION + "|"
+ + MAJOR_HOPLOG_EXTENSION + ")";
+ public static final Pattern SORTED_HOPLOG_PATTERN = Pattern.compile(SORTED_HOPLOG_REGEX);
+
+ //Amount of time before deleting old temporary files
+ final long TMP_FILE_EXPIRATION_TIME_MS = Long.getLong(HoplogConfig.TMP_FILE_EXPIRATION, HoplogConfig.TMP_FILE_EXPIRATION_DEFAULT);
+
+ static float RATIO = HoplogConfig.COMPACTION_FILE_RATIO_DEFAULT;
+
+ // Compacter for this bucket
+ private Compactor compacter;
+
+ private final HoplogReadersController hoplogReadersController;
+ private AtomicLong previousCleanupTimestamp = new AtomicLong(Long.MIN_VALUE);
+
+ /**
+ * The default HLL constant. gives an accuracy of about 3.25%
+ * public only for testing upgrade from 1.3 to 1.4
+ */
+ public static double HLL_CONSTANT = 0.03;
+ /**
+ * This estimator keeps track of this buckets entry count. This value is
+ * affected by flush and compaction cycles
+ */
+ private volatile ICardinality bucketSize = new HyperLogLog(HLL_CONSTANT);
+ //A set of tmp files that existed when this bucket organizer was originally
+ //created. These may still be open by the old primary, or they may be
+ //abandoned files.
+ private LinkedList<FileStatus> oldTmpFiles;
+
+ private ConcurrentMap<Hoplog, Boolean> tmpFiles = new ConcurrentHashMap<Hoplog, Boolean>();
+
+ protected volatile boolean organizerClosed = false;
+
+ /**
+ * For the 1.4 release we are changing the HLL_CONSTANT which will make the
+ * old persisted HLLs incompatible with the new HLLs. To fix this we will
+ * force a major compaction when the system starts up so that we will only
+ * have new HLLs in the system (see bug 51403)
+ */
+ private boolean startCompactionOnStartup = false;
+
+ /**
+ * @param region
+ * Region manager instance. Instances of hdfs listener instance,
+ * stats collector, file system, etc are shared by all buckets of a
+ * region and provided by region manager instance
+ * @param bucketId bucket id to be managed by this organizer
+ * @throws IOException
+ */
+ public HdfsSortedOplogOrganizer(HdfsRegionManager region, int bucketId) throws IOException{
+ super(region, bucketId);
+
+ String val = System.getProperty(HoplogConfig.COMPACTION_FILE_RATIO);
+ try {
+ RATIO = Float.parseFloat(val);
+ } catch (Exception e) {
+ }
+
+ hoplogReadersController = new HoplogReadersController();
+
+ // initialize with all the files in the directory
+ List<Hoplog> hoplogs = identifyAndLoadSortedOplogs(true);
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Initializing bucket with existing hoplogs, count = " + hoplogs.size(), logPrefix);
+ }
+ for (Hoplog hoplog : hoplogs) {
+ addSortedOplog(hoplog, false, true);
+ }
+
+ // initialize sequence to the current maximum
+ sequence = new AtomicInteger(findMaxSequenceNumber(hoplogs));
+
+ initOldTmpFiles();
+
+ FileSystem fs = store.getFileSystem();
+ Path cleanUpIntervalPath = new Path(store.getHomeDir(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME);
+ if (!fs.exists(cleanUpIntervalPath)) {
+ long intervalDurationMillis = store.getPurgeInterval() * 60 * 1000;
+ HoplogUtil.exposeCleanupIntervalMillis(fs, cleanUpIntervalPath, intervalDurationMillis);
+ }
+
+ if (startCompactionOnStartup) {
+ forceCompactionOnVersionUpgrade();
+ if (logger.isInfoEnabled()) {
+ logger.info(LocalizedStrings.HOPLOG_MAJOR_COMPACTION_SCHEDULED_FOR_BETTER_ESTIMATE);
+ }
+ }
+ }
+
+ /**
+ * Iterates on the input buffer and persists it in a new sorted oplog. This operation is
+ * synchronous and blocks the thread.
+ */
+ @Override
+ public void flush(Iterator<? extends QueuedPersistentEvent> iterator, final int count)
+ throws IOException, ForceReattemptException {
+ assert iterator != null;
+
+ if (logger.isDebugEnabled())
+ logger.debug("{}Initializing flush operation", logPrefix);
+
+ final Hoplog so = getTmpSortedOplog(null, FLUSH_HOPLOG_EXTENSION);
+ HoplogWriter writer = null;
+ ICardinality localHLL = new HyperLogLog(HLL_CONSTANT);
+
+ // variables for updating stats
+ long start = stats.getFlush().begin();
+ int byteCount = 0;
+
+ try {
+ /**MergeGemXDHDFSToGFE changed the following statement as the code of HeapDataOutputStream is not merged */
+ //HeapDataOutputStream out = new HeapDataOutputStream();
+
+ try {
+ writer = this.store.getSingletonWriter().runSerially(new Callable<Hoplog.HoplogWriter>() {
+ @Override
+ public HoplogWriter call() throws Exception {
+ return so.createWriter(count);
+ }
+ });
+ } catch (Exception e) {
+ if (e instanceof IOException) {
+ throw (IOException)e;
+ }
+ throw new IOException(e);
+ }
+
+ while (iterator.hasNext() && !this.organizerClosed) {
+ HeapDataOutputStream out = new HeapDataOutputStream(1024, null);
+
+ QueuedPersistentEvent item = iterator.next();
+ item.toHoplogEventBytes(out);
+ byte[] valueBytes = out.toByteArray();
+ writer.append(item.getRawKey(), valueBytes);
+
+ // add key length and value length to stats byte counter
+ byteCount += (item.getRawKey().length + valueBytes.length);
+
+ // increment size only if entry is not deleted
+ if (!isDeletedEntry(valueBytes, 0)) {
+ int hash = MurmurHash.hash(item.getRawKey());
+ localHLL.offerHashed(hash);
+ }
+ /**MergeGemXDHDFSToGFE how to clear for reuse. Leaving it for Darrel to merge this change*/
+ //out.clearForReuse();
+ }
+ if (organizerClosed)
+ throw new BucketMovedException("The current bucket is moved BucketID: "+
+ this.bucketId + " Region name: " + this.regionManager.getRegion().getName());
+
+ // append completed. provide cardinality and close writer
+ writer.close(buildMetaData(localHLL));
+ writer = null;
+ } catch (IOException e) {
+ stats.getFlush().error(start);
+ try {
+ e = handleWriteHdfsIOError(writer, so, e);
+ } finally {
+ //Set the writer to null because handleWriteHDFSIOError has
+ //already closed the writer.
+ writer = null;
+ }
+ throw e;
+ } catch (BucketMovedException e) {
+ stats.getFlush().error(start);
+ deleteTmpFile(writer, so);
+ writer = null;
+ throw e;
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
+ }
+
+ try{
+
+ // ping secondaries before making the file a legitimate file to ensure
+ // that in case of split brain, no other vm has taken up as primary. #50110.
+ pingSecondaries();
+
+ // rename file and check if renaming was successful
+ synchronized (changePrimarylockObject) {
+ if (!organizerClosed)
+ makeLegitimate(so);
+ else
+ throw new BucketMovedException("The current bucket is moved BucketID: "+
+ this.bucketId + " Region name: " + this.regionManager.getRegion().getName());
+ }
+ try {
+ so.getSize();
+ } catch (IllegalStateException e) {
+ throw new IOException("Failed to rename hoplog file:" + so.getFileName());
+ }
+
+ //Disabling this assertion due to bug 49740
+ // check to make sure the sequence number is correct
+// if (ENABLE_INTEGRITY_CHECKS) {
+// Assert.assertTrue(getSequenceNumber(so) == findMaxSequenceNumber(identifyAndLoadSortedOplogs(false)),
+// "Invalid sequence number detected for " + so.getFileName());
+// }
+
+ // record the file for future maintenance and reads
+ addSortedOplog(so, false, true);
+ stats.getFlush().end(byteCount, start);
+ incrementDiskUsage(so.getSize());
+ } catch (BucketMovedException e) {
+ stats.getFlush().error(start);
+ deleteTmpFile(writer, so);
+ writer = null;
+ throw e;
+ } catch (IOException e) {
+ stats.getFlush().error(start);
+ logger.warn(LocalizedStrings.HOPLOG_FLUSH_OPERATION_FAILED, e);
+ throw e;
+ }
+
+ submitCompactionRequests();
+ }
+
+
+ /**
+ * store cardinality information in metadata
+ * @param localHLL the hll estimate for this hoplog only
+ */
+ private EnumMap<Meta, byte[]> buildMetaData(ICardinality localHLL) throws IOException {
+ EnumMap<Meta, byte[]> map = new EnumMap<Hoplog.Meta, byte[]>(Meta.class);
+ map.put(Meta.LOCAL_CARDINALITY_ESTIMATE_V2, localHLL.getBytes());
+ return map;
+ }
+
+ private void submitCompactionRequests() throws IOException {
+ CompactionRequest req;
+
+ // determine if a major compaction is needed and create a compaction request
+ // with compaction manager
+ if (store.getMajorCompaction()) {
+ if (isMajorCompactionNeeded()) {
+ req = new CompactionRequest(regionFolder, bucketId, getCompactor(), true);
+ HDFSCompactionManager.getInstance(store).submitRequest(req);
+ }
+ }
+
+ // submit a minor compaction task. It will be ignored if there is no work to
+ // be done.
+ if (store.getMinorCompaction()) {
+ req = new CompactionRequest(regionFolder, bucketId, getCompactor(), false);
+ HDFSCompactionManager.getInstance(store).submitRequest(req);
+ }
+ }
+
+ /**
+ * @return true if the oldest hoplog was created 1 major compaction interval ago
+ */
+ private boolean isMajorCompactionNeeded() throws IOException {
+ // major compaction interval in milliseconds
+
+ long majorCInterval = ((long)store.getMajorCompactionInterval()) * 60 * 1000;
+
+ Hoplog oplog = hoplogReadersController.getOldestHoplog();
+ if (oplog == null) {
+ return false;
+ }
+
+ long oldestFileTime = oplog.getModificationTimeStamp();
+ long now = System.currentTimeMillis();
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Checking oldest hop " + oplog.getFileName()
+ + " for majorCompactionInterval=" + majorCInterval
+ + " + now=" + now, logPrefix);
+ }
+ if (oldestFileTime > 0l && oldestFileTime < (now - majorCInterval)) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public SortedHoplogPersistedEvent read(byte[] key) throws IOException {
+ long startTime = stats.getRead().begin();
+ String user = logger.isDebugEnabled() ? "Read" : null;
+
+ // collect snapshot of hoplogs
+ List<TrackedReference<Hoplog>> hoplogs = null;
+ hoplogs = hoplogReadersController.getTrackedSortedOplogList(user);
+ try {
+ // search for the key in order starting with the youngest oplog
+ for (TrackedReference<Hoplog> hoplog : hoplogs) {
+ HoplogReader reader = hoplog.get().getReader();
+ byte[] val = reader.read(key);
+ if (val != null) {
+ // value found in a younger hoplog. stop iteration
+ SortedHoplogPersistedEvent eventObj = deserializeValue(val);
+ stats.getRead().end(val.length, startTime);
+ return eventObj;
+ }
+ }
+ } catch (IllegalArgumentException e) {
+ if (IOException.class.isAssignableFrom(e.getCause().getClass())) {
+ throw handleIOError((IOException) e.getCause());
+ } else {
+ throw e;
+ }
+ } catch (IOException e) {
+ throw handleIOError(e);
+ } catch (HDFSIOException e) {
+ throw handleIOError(e);
+ } finally {
+ hoplogReadersController.releaseHoplogs(hoplogs, user);
+ }
+
+ stats.getRead().end(0, startTime);
+ return null;
+ }
+
+ protected IOException handleIOError(IOException e) {
+ // expose the error wrapped inside remote exception
+ if (e instanceof RemoteException) {
+ return ((RemoteException) e).unwrapRemoteException();
+ }
+
+ checkForSafeError(e);
+
+ // it is not a safe error. let the caller handle it
+ return e;
+ }
+
+ protected HDFSIOException handleIOError(HDFSIOException e) {
+ checkForSafeError(e);
+ return e;
+ }
+
+ protected void checkForSafeError(Exception e) {
+ boolean safeError = ShutdownHookManager.get().isShutdownInProgress();
+ if (safeError) {
+ // IOException because of closed file system. This happens when member is
+ // shutting down
+ if (logger.isDebugEnabled())
+ logger.debug("IO error caused by filesystem shutdown", e);
+ throw new CacheClosedException("IO error caused by filesystem shutdown", e);
+ }
+
+ if(isClosed()) {
+ //If the hoplog organizer is closed, throw an exception to indicate the
+ //caller should retry on the new primary.
+ throw new PrimaryBucketException(e);
+ }
+ }
+
+ protected IOException handleWriteHdfsIOError(HoplogWriter writer, Hoplog so, IOException e)
+ throws IOException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Handle write error:" + so, logPrefix);
+ }
+
+ closeWriter(writer);
+ // add to the janitor queue
+ tmpFiles.put(so, Boolean.TRUE);
+
+ return handleIOError(e);
+ }
+
+ private void deleteTmpFile(HoplogWriter writer, Hoplog so) {
+ closeWriter(writer);
+
+ // delete the temporary hoplog
+ try {
+ if (so != null) {
+ so.delete();
+ }
+ } catch (IOException e1) {
+ logger.info(e1);
+ }
+ }
+
+ private void closeWriter(HoplogWriter writer) {
+ if (writer != null) {
+ // close writer before deleting it
+ try {
+ writer.close();
+ } catch (Throwable e1) {
+ // error to close hoplog will happen if no connections to datanode are
+ // available. Try to delete the file on namenode
+ if(!isClosed()) {
+ logger.info(e1);
+ }
+ }
+ }
+ }
+
+ /**
+ * Closes hoplog and suppresses IO during reader close. Suppressing IO errors
+ * when the organizer is closing or an hoplog becomes inactive lets the system
+ * continue freeing other resources. It could potentially lead to socket
+ * leaks though.
+ */
+ private void closeReaderAndSuppressError(Hoplog hoplog, boolean clearCache) {
+ try {
+ hoplog.close();
+ } catch (IOException e) {
+ // expose the error wrapped inside remote exception
+ if (e instanceof RemoteException) {
+ e = ((RemoteException) e).unwrapRemoteException();
+ }
+ logger.info(e);
+ }
+ }
+
+ @Override
+ public BucketIterator scan() throws IOException {
+ String user = logger.isDebugEnabled() ? "Scan" : null;
+ List<TrackedReference<Hoplog>> hoplogs = null;
+ BucketIterator iter = null;
+ try {
+ hoplogs = hoplogReadersController.getTrackedSortedOplogList(user);
+ iter = new BucketIterator(hoplogs);
+ return iter;
+ } finally {
+ // Normally the hoplogs will be released when the iterator is closed. The
+ // hoplogs must be released only if creating the iterator has failed.
+ if (iter == null) {
+ hoplogReadersController.releaseHoplogs(hoplogs, user);
+ }
+ }
+ }
+
+ @Override
+ public BucketIterator scan(byte[] from, byte[] to) throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public BucketIterator scan(byte[] from, boolean fromInclusive, byte[] to, boolean toInclusive) throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public HoplogIterator<byte[], SortedHoplogPersistedEvent> scan(
+ long startOffset, long length) throws IOException {
+ throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName());
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+
+ synchronized (changePrimarylockObject) {
+ organizerClosed = true;
+ }
+ //Suspend compaction
+ getCompactor().suspend();
+
+ //Close the readers controller.
+ hoplogReadersController.close();
+
+ previousCleanupTimestamp.set(Long.MIN_VALUE);
+
+ }
+
+ /**
+ * This method call will happen on secondary node. The secondary node needs to update its data
+ * structures
+ */
+ @Override
+ public void hoplogCreated(String region, int bucketId, Hoplog... oplogs)
+ throws IOException {
+ for (Hoplog oplog : oplogs) {
+ addSortedOplog(oplog, false, true);
+ }
+ }
+
+ @Override
+ public long sizeEstimate() {
+ return this.bucketSize.cardinality();
+ }
+
+ private void addSortedOplog(Hoplog so, boolean notify, boolean addsToBucketSize)
+ throws IOException {
+ if (!hoplogReadersController.addSortedOplog(so)) {
+ so.close();
+ throw new InternalGemFireException("Failed to add " + so);
+ }
+
+ String user = logger.isDebugEnabled() ? "Add" : null;
+ if (addsToBucketSize) {
+ TrackedReference<Hoplog> ref = null;
+ try {
+ ref = hoplogReadersController.trackHoplog(so, user);
+ synchronized (bucketSize) {
+ ICardinality localHLL = ref.get().getEntryCountEstimate();
+ if (localHLL != null) {
+ bucketSize = mergeHLL(bucketSize, localHLL);
+ }
+ }
+ } finally {
+ if (ref != null) {
+ hoplogReadersController.releaseHoplog(ref, user);
+ }
+ }
+ }
+
+ if (notify && listener != null) {
+ listener.hoplogCreated(regionFolder, bucketId, so);
+ }
+ }
+
+ private void reEstimateBucketSize() throws IOException {
+ ICardinality global = null;
+ String user = logger.isDebugEnabled() ? "HLL" : null;
+ List<TrackedReference<Hoplog>> hoplogs = null;
+ try {
+ hoplogs = hoplogReadersController.getTrackedSortedOplogList(user);
+ global = new HyperLogLog(HLL_CONSTANT);
+ for (TrackedReference<Hoplog> hop : hoplogs) {
+ global = mergeHLL(global, hop.get().getEntryCountEstimate());
+ }
+ } finally {
+ hoplogReadersController.releaseHoplogs(hoplogs, user);
+ }
+ bucketSize = global;
+ }
+
+ protected ICardinality mergeHLL(ICardinality global, ICardinality local)
+ /*throws IOException*/ {
+ try {
+ return global.merge(local);
+ } catch (CardinalityMergeException e) {
+ // uncomment this after the 1.4 release
+ //throw new InternalGemFireException(e.getLocalizedMessage(), e);
+ startCompactionOnStartup = true;
+ return global;
+ }
+ }
+
+ private void removeSortedOplog(TrackedReference<Hoplog> so, boolean notify) throws IOException {
+ hoplogReadersController.removeSortedOplog(so);
+
+ // release lock before notifying listeners
+ if (notify && listener != null) {
+ listener.hoplogDeleted(regionFolder, bucketId, so.get());
+ }
+ }
+
+ private void notifyCompactionListeners(boolean isMajor) {
+ listener.compactionCompleted(regionFolder, bucketId, isMajor);
+ }
+
+ /**
+ * This method call will happen on secondary node. The secondary node needs to update its data
+ * structures
+ * @throws IOException
+ */
+ @Override
+ public void hoplogDeleted(String region, int bucketId, Hoplog... oplogs) throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public synchronized Compactor getCompactor() {
+ if (compacter == null) {
+ compacter = new HoplogCompactor();
+ }
+ return compacter;
+ }
+
+ @Override
+ protected Hoplog getHoplog(Path hoplogPath) throws IOException {
+ Hoplog so = new HFileSortedOplog(store, hoplogPath, store.getBlockCache(), stats, store.getStats());
+ return so;
+ }
+
+ /**
+ * locks sorted oplogs collection, removes oplog and renames for deletion later
+ * @throws IOException
+ */
+ void markSortedOplogForDeletion(List<TrackedReference<Hoplog>> targets, boolean notify) throws IOException {
+ for (int i = targets.size(); i > 0; i--) {
+ TrackedReference<Hoplog> so = targets.get(i - 1);
+ removeSortedOplog(so, true);
+ if (!store.getFileSystem().exists(new Path(bucketPath, so.get().getFileName()))) {
+ // the hoplog does not even exist on file system. Skip remaining steps
+ continue;
+ }
+ addExpiryMarkerForAFile(so.get());
+ }
+ }
+
+ /**
+ * Deletes expired hoplogs and expiry markers from the file system. Calculates
+ * a target timestamp based on cleanup interval. Then gets list of target
+ * hoplogs. It also updates the disk usage state
+ *
+ * @return number of files deleted
+ */
+ synchronized int initiateCleanup() throws IOException {
+ int conf = store.getPurgeInterval();
+ // minutes to milliseconds
+ long intervalDurationMillis = conf * 60 * 1000;
+ // Any expired hoplog with timestamp less than targetTS is a delete
+ // candidate.
+ long targetTS = System.currentTimeMillis() - intervalDurationMillis;
+ if (logger.isDebugEnabled()) {
+ logger.debug("Target timestamp for expired hoplog deletion " + targetTS, logPrefix);
+ }
+ // avoid too frequent cleanup invocations. Exit cleanup invocation if the
+ // previous cleanup was executed within 10% range of cleanup interval
+ if (previousCleanupTimestamp.get() > targetTS
+ && (previousCleanupTimestamp.get() - targetTS) < (intervalDurationMillis / 10)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Skip cleanup, previous " + previousCleanupTimestamp.get(), logPrefix);
+ }
+ return 0;
+ }
+
+ List<FileStatus> targets = getOptimizationTargets(targetTS);
+ return deleteExpiredFiles(targets);
+ }
+
+ protected int deleteExpiredFiles(List<FileStatus> targets) throws IOException {
+ if (targets == null) {
+ return 0;
+ }
+
+ for (FileStatus file : targets) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Deleting file: " + file.getPath(), logPrefix);
+ }
+ store.getFileSystem().delete(file.getPath(), false);
+
+ if (isClosed()) {
+ if (logger.isDebugEnabled())
+ logger.debug("{}Expiry file cleanup interupted by bucket close", logPrefix);
+ return 0;
+ }
+ incrementDiskUsage(-1 * file.getLen());
+ }
+
+ previousCleanupTimestamp.set(System.currentTimeMillis());
+ return targets.size();
+ }
+
+ /**
+ * @param ts
+ * target timestamp
+ * @return list of hoplogs, whose expiry markers were created before target
+ * timestamp, and the expiry marker itself.
+ * @throws IOException
+ */
+ protected List<FileStatus> getOptimizationTargets(long ts) throws IOException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Identifying optimization targets " + ts, logPrefix);
+ }
+
+ List<FileStatus> deleteTargets = new ArrayList<FileStatus>();
+ FileStatus[] markers = getExpiryMarkers();
+ if (markers != null) {
+ for (FileStatus marker : markers) {
+ String name = truncateExpiryExtension(marker.getPath().getName());
+ long timestamp = marker.getModificationTime();
+
+ // expired minor compacted files are not being used anywhere. These can
+ // be removed immediately. All the other expired files should be removed
+ // when the files have aged
+ boolean isTarget = false;
+
+ if (name.endsWith(MINOR_HOPLOG_EXTENSION)) {
+ isTarget = true;
+ } else if (timestamp < ts && name.endsWith(FLUSH_HOPLOG_EXTENSION)) {
+ isTarget = true;
+ } else if (timestamp < ts && name.endsWith(MAJOR_HOPLOG_EXTENSION)) {
+ long majorCInterval = ((long)store.getMajorCompactionInterval()) * 60 * 1000;
+ if (timestamp < (System.currentTimeMillis() - majorCInterval)) {
+ isTarget = true;
+ }
+ }
+ if (!isTarget) {
+ continue;
+ }
+
+ // if the file is still being read, do not delete or rename it
+ TrackedReference<Hoplog> used = hoplogReadersController.getInactiveHoplog(name);
+ if (used != null) {
+ if (used.inUse() && logger.isDebugEnabled()) {
+ logger.debug("{}Optimizer: found active expired hoplog:" + name, logPrefix);
+ } else if (logger.isDebugEnabled()) {
+ logger.debug("{}Optimizer: found open expired hoplog:" + name, logPrefix);
+ }
+ continue;
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Delete target identified " + marker.getPath(), logPrefix);
+ }
+
+ deleteTargets.add(marker);
+ Path hoplogPath = new Path(bucketPath, name);
+ if (store.getFileSystem().exists(hoplogPath)) {
+ FileStatus hoplog = store.getFileSystem().getFileStatus(hoplogPath);
+ deleteTargets.add(hoplog);
+ }
+ }
+ }
+ return deleteTargets;
+ }
+
+ /**
+ * Returns a list of of hoplogs present in the bucket's directory, expected to be called during
+ * hoplog set initialization
+ */
+ List<Hoplog> identifyAndLoadSortedOplogs(boolean countSize) throws IOException {
+ FileSystem fs = store.getFileSystem();
+ if (! fs.exists(bucketPath)) {
+ return new ArrayList<Hoplog>();
+ }
+
+ FileStatus allFiles[] = fs.listStatus(bucketPath);
+ ArrayList<FileStatus> validFiles = new ArrayList<FileStatus>();
+ for (FileStatus file : allFiles) {
+ // All hoplog files contribute to disk usage
+ Matcher matcher = HOPLOG_NAME_PATTERN.matcher(file.getPath().getName());
+ if (! matcher.matches()) {
+ // not a hoplog
+ continue;
+ }
+
+ // account for the disk used by this file
+ if (countSize) {
+ incrementDiskUsage(file.getLen());
+ }
+
+ // All valid hoplog files must match the regex
+ matcher = SORTED_HOPLOG_PATTERN.matcher(file.getPath().getName());
+ if (matcher.matches()) {
+ validFiles.add(file);
+ }
+ }
+
+ FileStatus[] markers = getExpiryMarkers();
+ FileStatus[] validHoplogs = filterValidHoplogs(
+ validFiles.toArray(new FileStatus[validFiles.size()]), markers);
+
+ ArrayList<Hoplog> results = new ArrayList<Hoplog>();
+ if (validHoplogs == null || validHoplogs.length == 0) {
+ return results;
+ }
+
+ for (int i = 0; i < validHoplogs.length; i++) {
+ // Skip directories
+ if (validHoplogs[i].isDirectory()) {
+ continue;
+ }
+
+ final Path p = validHoplogs[i].getPath();
+ // skip empty file
+ if (fs.getFileStatus(p).getLen() <= 0) {
+ continue;
+ }
+
+ Hoplog hoplog = new HFileSortedOplog(store, p, store.getBlockCache(), stats, store.getStats());
+ results.add(hoplog);
+ }
+
+ return results;
+ }
+
+ private static int findMaxSequenceNumber(List<Hoplog> hoplogs) throws IOException {
+ int maxSeq = 0;
+ for (Hoplog hoplog : hoplogs) {
+ maxSeq = Math.max(maxSeq, getSequenceNumber(hoplog));
+ }
+ return maxSeq;
+ }
+
+ /**
+ * @return the sequence number associate with a hoplog file
+ */
+ static int getSequenceNumber(Hoplog hoplog) {
+ Matcher matcher = SORTED_HOPLOG_PATTERN.matcher(hoplog.getFileName());
+ boolean matched = matcher.find();
+ assert matched;
+ return Integer.valueOf(matcher.group(3));
+ }
+
+ protected FileStatus[] getExpiryMarkers() throws IOException {
+ FileSystem fs = store.getFileSystem();
+ if (hoplogReadersController.hoplogs == null
+ || hoplogReadersController.hoplogs.size() == 0) {
+ // there are no hoplogs in the system. May be the bucket is not existing
+ // at all.
+ if (!fs.exists(bucketPath)) {
+ if (logger.isDebugEnabled())
+ logger.debug("{}This bucket is unused, skipping expired hoplog check", logPrefix);
+ return null;
+ }
+ }
+
+ FileStatus files[] = FSUtils.listStatus(fs, bucketPath, new PathFilter() {
+ @Override
+ public boolean accept(Path file) {
+ // All expired hoplog end with expire extension and must match the valid file regex
+ String fileName = file.getName();
+ if (! fileName.endsWith(EXPIRED_HOPLOG_EXTENSION)) {
+ return false;
+ }
+ fileName = truncateExpiryExtension(fileName);
+ Matcher matcher = SORTED_HOPLOG_PATTERN.matcher(fileName);
+ return matcher.find();
+ }
+
+ });
+ return files;
+ }
+
+ @Override
+ public void clear() throws IOException {
+ //Suspend compaction while we are doing the clear. This
+ //aborts the currently in progress compaction.
+ getCompactor().suspend();
+
+ // while compaction is suspended, clear method marks hoplogs for deletion
+ // only. Files will be removed by cleanup thread after active gets and
+ // iterations are completed
+ String user = logger.isDebugEnabled() ? "clear" : null;
+ List<TrackedReference<Hoplog>> oplogs = null;
+ try {
+ oplogs = hoplogReadersController.getTrackedSortedOplogList(user);
+ markSortedOplogForDeletion(oplogs, true);
+ } finally {
+ if (oplogs != null) {
+ hoplogReadersController.releaseHoplogs(oplogs, user);
+ }
+ //Resume compaction
+ getCompactor().resume();
+ }
+ }
+
+ /**
+ * Performs the following activities
+ * <UL>
+ * <LI>Submits compaction requests as needed
+ * <LI>Deletes tmp files which the system failed to removed earlier
+ */
+ @Override
+ public void performMaintenance() throws IOException {
+ long startTime = System.currentTimeMillis();
+
+ if (logger.isDebugEnabled())
+ logger.debug("{}Executing bucket maintenance", logPrefix);
+
+ submitCompactionRequests();
+ hoplogReadersController.closeInactiveHoplogs();
+ initiateCleanup();
+
+ cleanupTmpFiles();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Time spent in bucket maintenance (in ms): "
+ + (System.currentTimeMillis() - startTime), logPrefix);
+ }
+ }
+
+ @Override
+ public Future<CompactionStatus> forceCompaction(boolean isMajor) {
+ CompactionRequest request = new CompactionRequest(regionFolder, bucketId,
+ getCompactor(), isMajor, true/*force*/);
+ return HDFSCompactionManager.getInstance(store).submitRequest(request);
+ }
+
+ private Future<CompactionStatus> forceCompactionOnVersionUpgrade() {
+ CompactionRequest request = new CompactionRequest(regionFolder, bucketId, getCompactor(), true, true, true);
+ return HDFSCompactionManager.getInstance(store).submitRequest(request);
+ }
+
+ @Override
+ public long getLastMajorCompactionTimestamp() {
+ long ts = 0;
+ String user = logger.isDebugEnabled() ? "StoredProc" : null;
+ List<TrackedReference<Hoplog>> hoplogs = hoplogReadersController.getTrackedSortedOplogList(user);
+ try {
+ for (TrackedReference<Hoplog> hoplog : hoplogs) {
+ String fileName = hoplog.get().getFileName();
+ Matcher file = HOPLOG_NAME_PATTERN.matcher(fileName);
+ if (file.matches() && fileName.endsWith(MAJOR_HOPLOG_EXTENSION)) {
+ ts = getHoplogTimestamp(file);
+ break;
+ }
+ }
+ } finally {
+ hoplogReadersController.releaseHoplogs(hoplogs, user);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}HDFS: for bucket:"+getRegionBucketStr()+" returning last major compaction timestamp "+ts, logPrefix);
+ }
+ return ts;
+ }
+
+ private void initOldTmpFiles() throws IOException {
+ FileSystem fs = store.getFileSystem();
+ if (! fs.exists(bucketPath)) {
+ return;
+ }
+
+ oldTmpFiles = new LinkedList<FileStatus>(Arrays.asList(fs.listStatus(bucketPath, new TmpFilePathFilter())));
+ }
+
+ private void cleanupTmpFiles() throws IOException {
+ if(oldTmpFiles == null && tmpFiles == null) {
+ return;
+ }
+
+ if (oldTmpFiles != null) {
+ FileSystem fs = store.getFileSystem();
+ long now = System.currentTimeMillis();
+ for (Iterator<FileStatus> itr = oldTmpFiles.iterator(); itr.hasNext();) {
+ FileStatus file = itr.next();
+ if(file.getModificationTime() + TMP_FILE_EXPIRATION_TIME_MS > now) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Deleting temporary file:" + file.getPath(), logPrefix);
+ }
+ fs.delete(file.getPath(), false);
+ itr.remove();
+ }
+ }
+ }
+ if (tmpFiles != null) {
+ for (Hoplog so : tmpFiles.keySet()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Deleting temporary file:" + so.getFileName(), logPrefix);
+ }
+ deleteTmpFile(null, so);
+ }
+ }
+ }
+
+ /**
+ * Executes tiered compaction of hoplog files. One instance of compacter per bucket will exist
+ */
+ protected class HoplogCompactor implements Compactor {
+ private volatile boolean suspend = false;
+
+ // the following boolean will be used to synchronize minor compaction
+ private AtomicBoolean isMinorCompactionActive = new AtomicBoolean(false);
+ // the following boolean will be used to synchronize major compaction
+ private AtomicBoolean isMajorCompactionActive = new AtomicBoolean(false);
+ // the following integer tracks the max sequence number amongst the
+ // target files being major compacted. This value will be used to prevent
+ // concurrent MajorC and minorC. MinorC is preempted in case of an
+ // overlap. This object is also used as a lock. The lock is acquired before
+ // identifying compaction targets and before marking targets for expiry
+ final AtomicInteger maxMajorCSeqNum = new AtomicInteger(-1);
+
+ @Override
+ public void suspend() {
+ long wait = Long.getLong(HoplogConfig.SUSPEND_MAX_WAIT_MS, HoplogConfig.SUSPEND_MAX_WAIT_MS_DEFAULT);
+ this.suspend=true;
+ //this forces the compact method to finish.
+ while (isMajorCompactionActive.get() || isMinorCompactionActive.get()) {
+ if (wait < 0) {
+ wait = Long.getLong(HoplogConfig.SUSPEND_MAX_WAIT_MS, HoplogConfig.SUSPEND_MAX_WAIT_MS_DEFAULT);
+ String act = isMajorCompactionActive.get() ? "MajorC" : "MinorC";
+ logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_SUSPEND_OF_0_FAILED_IN_1, new Object[] {act, wait}));
+ break;
+ }
+ try {
+ TimeUnit.MILLISECONDS.sleep(50);
+ wait -= 50;
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void resume() {
+ this.suspend = false;
+ }
+
+ @Override
+ public boolean isBusy(boolean isMajor) {
+ if (isMajor) {
+ return isMajorCompactionActive.get();
+ } else {
+ return isMinorCompactionActive.get();
+ }
+ }
+
+ /**
+ * compacts hoplogs. The method takes a minor or major compaction "lock" to
+ * prevent concurrent execution of compaction cycles. A possible improvement
+ * is to allow parallel execution of minor compaction if the sets of
+ * hoplogs being compacted are disjoint.
+ */
+ @Override
+ public boolean compact(boolean isMajor, boolean isForced) throws IOException {
+ if(suspend) {
+ return false;
+ }
+
+ String extension = null;
+ IOOperation compactionStats = null;
+ long startTime = 0;
+ final AtomicBoolean lock;
+ Hoplog compactedHoplog = null;
+ List<TrackedReference<Hoplog>> targets = null;
+ String user = logger.isDebugEnabled() ? (isMajor ? "MajorC" : "MinorC") : null;
+
+ if (isMajor) {
+ lock = isMajorCompactionActive;
+ extension = MAJOR_HOPLOG_EXTENSION;
+ compactionStats = stats.getMajorCompaction();
+ } else {
+ lock = isMinorCompactionActive;
+ extension = MINOR_HOPLOG_EXTENSION;
+ compactionStats = stats.getMinorCompaction();
+ }
+
+ // final check before beginning compaction. Return if compaction is active
+ if (! lock.compareAndSet(false, true)) {
+ if (isMajor) {
+ if (logger.isDebugEnabled())
+ logger.debug("{}Major compaction already active. Ignoring new request", logPrefix);
+ } else {
+ if (logger.isDebugEnabled())
+ logger.debug("Minor compaction already active. Ignoring new request", logPrefix);
+ }
+ return false;
+ }
+
+ try {
+ if(suspend) {
+ return false;
+ }
+
+ // variables for updating stats
+ startTime = compactionStats.begin();
+
+ int seqNum = -1;
+ int lastKnownMajorCSeqNum;
+ synchronized (maxMajorCSeqNum) {
+ lastKnownMajorCSeqNum = maxMajorCSeqNum.get();
+ targets = hoplogReadersController.getTrackedSortedOplogList(user);
+ getCompactionTargets(isMajor, targets, lastKnownMajorCSeqNum);
+ if (targets != null && targets.size() > 0) {
+ targets = Collections.unmodifiableList(targets);
+ seqNum = getSequenceNumber(targets.get(0).get());
+ if (isMajor) {
+ maxMajorCSeqNum.set(seqNum);
+ }
+ }
+ }
+
+ if (targets == null || targets.isEmpty() || (!isMajor && targets.size() == 1 && !isForced)) {
+ if (logger.isDebugEnabled()){
+ logger.debug("{}Skipping compaction, too few hoplops to compact. Major?" + isMajor, logPrefix);
+ }
+
+ compactionStats.end(0, startTime);
+ return true;
+ }
+
+ //In case that we only have one major compacted file, we don't need to run major compaction to
+ //generate a copy of the same content
+ if (targets.size() == 1 && !isForced) {
+ String hoplogName = targets.get(0).get().getFileName();
+ if (hoplogName.endsWith(MAJOR_HOPLOG_EXTENSION)){
+ if (logger.isDebugEnabled()){
+ logger.debug("{}Skipping compaction, no need to compact a major compacted file. Major?" + isMajor, logPrefix);
+ }
+ compactionStats.end(0, startTime);
+ return true;
+ }
+ }
+
+ if (logger.isDebugEnabled()) {
+ for (TrackedReference<Hoplog> target : targets) {
+ if (logger.isDebugEnabled()) {
+ fineLog("Target:", target, " size:", target.get().getSize());
+ }
+ }
+ }
+
+ // Create a temporary hoplog for compacted hoplog. The compacted hoplog
+ // will have the seq number same as that of youngest target file. Any
+ // hoplog younger than target hoplogs will have a higher sequence number
+ compactedHoplog = getTmpSortedOplog(seqNum, extension);
+
+ long byteCount;
+ try {
+ byteCount = fillCompactionHoplog(isMajor, targets, compactedHoplog, lastKnownMajorCSeqNum);
+ compactionStats.end(byteCount, startTime);
+ } catch (InterruptedException e) {
+ if (logger.isDebugEnabled())
+ logger.debug("{}Compaction execution suspended", logPrefix);
+ compactionStats.error(startTime);
+ return false;
+ } catch (ForceReattemptException e) {
+ if (logger.isDebugEnabled())
+ logger.debug("{}Compaction execution suspended", logPrefix);
+ compactionStats.error(startTime);
+ return false;
+ }
+
+ // creation of compacted hoplog completed, its time to use it for
+ // reading. Before using it, make sure minorC and mojorC were not
+ // executing on overlapping sets of files. All targets can be marked for
+ // expiration. Notify listener if configured. Update bucket size
+ synchronized (maxMajorCSeqNum) {
+ if (!isMajor && isMinorMajorOverlap(targets, maxMajorCSeqNum.get())) {
+ // MajorC is higher priority. In case of any overlap kill minorC
+ if (logger.isDebugEnabled())
+ logger.debug("{}Interrupting MinorC for a concurrent MajorC", logPrefix);
+ compactionStats.error(startTime);
+ return false;
+ }
+ addSortedOplog(compactedHoplog, true, false);
+ markSortedOplogForDeletion(targets, true);
+ }
+ } catch (IOException e) {
+ compactionStats.error(startTime);
+ throw e;
+ } finally {
+ if (isMajor) {
+ maxMajorCSeqNum.set(-1);
+ }
+ lock.set(false);
+ hoplogReadersController.releaseHoplogs(targets, user);
+ }
+
+ incrementDiskUsage(compactedHoplog.getSize());
+ reEstimateBucketSize();
+
+ notifyCompactionListeners(isMajor);
+ return true;
+ }
+
+ /**
+ * Major compaction compacts all files. Seq number of the youngest file
+ * being MajorCed is known. If MinorC is operating on any file with a seq
+ * number less than this number, there is a overlap
+ * @param num
+ */
+ boolean isMinorMajorOverlap(List<TrackedReference<Hoplog>> targets, int num) {
+ if (num < 0 || targets == null || targets.isEmpty()) {
+ return false;
+ }
+
+ for (TrackedReference<Hoplog> hop : targets) {
+ if (getSequenceNumber(hop.get()) <= num) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Iterates over targets and writes eligible targets to the output hoplog.
+ * Handles creation of iterators and writer and closing it in case of
+ * errors.
+ */
+ public long fillCompactionHoplog(boolean isMajor,
+ List<TrackedReference<Hoplog>> targets, Hoplog output, int majorCSeqNum)
+ throws IOException, InterruptedException, ForceReattemptException {
+
+ HoplogWriter writer = null;
+ ICardinality localHLL = new HyperLogLog(HLL_CONSTANT);
+ HoplogSetIterator mergedIter = null;
+ int byteCount = 0;
+
+ try {
+ // create a merged iterator over the targets and write entries into
+ // output hoplog
+ mergedIter = new HoplogSetIterator(targets);
+ writer = output.createWriter(mergedIter.getRemainingEntryCount());
+
+ boolean interrupted = false;
+ for (; mergedIter.hasNext(); ) {
+ if (suspend) {
+ interrupted = true;
+ break;
+ } else if (!isMajor && maxMajorCSeqNum.get() > majorCSeqNum) {
+ // A new major compaction cycle is starting, quit minorC to avoid
+ // duplicate work and missing deletes
+ if (logger.isDebugEnabled())
+ logger.debug("{}Preempting MinorC, new MajorC cycle detected ", logPrefix);
+ interrupted = true;
+ break;
+ }
+
+ mergedIter.nextBB();
+
+ ByteBuffer k = mergedIter.getKeyBB();
+ ByteBuffer v = mergedIter.getValueBB();
+
+ boolean isDeletedEntry = isDeletedEntry(v.array(), v.arrayOffset());
+ if (isMajor && isDeletedEntry) {
+ // its major compaction, time to ignore deleted entries
+ continue;
+ }
+
+ if (!isDeletedEntry) {
+ int hash = MurmurHash.hash(k.array(), k.arrayOffset(), k.remaining(), -1);
+ localHLL.offerHashed(hash);
+ }
+
+ writer.append(k, v);
+ byteCount += (k.remaining() + v.remaining());
+ }
+
+ mergedIter.close();
+ mergedIter = null;
+
+ writer.close(buildMetaData(localHLL));
+ writer = null;
+
+ if (interrupted) {
+ // If we suspended compaction operations, delete the partially written
+ // file and return.
+ output.delete();
+ throw new InterruptedException();
+ }
+
+ // ping secondaries before making the file a legitimate file to ensure
+ // that in case of split brain, no other vm has taken up as primary. #50110.
+ pingSecondaries();
+
+ makeLegitimate(output);
+ return byteCount;
+ } catch (IOException e) {
+ e = handleWriteHdfsIOError(writer, output, e);
+ writer = null;
+ throw e;
+ } catch (ForceReattemptException e) {
+ output.delete();
+ throw e;
+ }finally {
+ if (mergedIter != null) {
+ mergedIter.close();
+ }
+
+ if (writer != null) {
+ writer.close();
+ }
+ }
+ }
+
+ /**
+ * identifies targets. For major compaction all sorted oplogs will be
+ * iterated on. For minor compaction, policy driven fewer targets will take
+ * place.
+ */
+ protected void getCompactionTargets(boolean major,
+ List<TrackedReference<Hoplog>> targets, int majorCSeqNum) {
+ if (!major) {
+ getMinorCompactionTargets(targets, majorCSeqNum);
+ }
+ }
+
+ /**
+ * list of oplogs most suitable for compaction. The alogrithm selects m
+ * smallest oplogs which are not bigger than X in size. Null if valid
+ * candidates are not found
+ */
+ void getMinorCompactionTargets(List<TrackedReference<Hoplog>> targets, int majorCSeqNum)
+ {
+ List<TrackedReference<Hoplog>> omittedHoplogs = new ArrayList<TrackedReference<Hoplog>>();
+
+ // reverse the order of hoplogs in list. the oldest file becomes the first file.
+ Collections.reverse(targets);
+
+ // hoplog greater than this size will not be minor-compacted
+ final long MAX_COMPACTION_FILE_SIZE;
+ // maximum number of files to be included in any compaction cycle
+ final int MAX_FILE_COUNT_COMPACTION;
+ // minimum number of files that must be present for compaction to be worth
+ final int MIN_FILE_COUNT_COMPACTION;
+
+ MAX_COMPACTION_FILE_SIZE = ((long)store.getInputFileSizeMax()) * 1024 *1024;
+ MAX_FILE_COUNT_COMPACTION = store.getInputFileCountMax();
+ MIN_FILE_COUNT_COMPACTION = store.getInputFileCountMin();
+
+ try {
+ // skip till first file smaller than the max compaction file size is
+ // found. And if MajorC is active, move to a file which is also outside
+ // scope of MajorC
+ for (Iterator<TrackedReference<Hoplog>> iterator = targets.iterator(); iterator.hasNext();) {
+ TrackedReference<Hoplog> oplog = iterator.next();
+ if (majorCSeqNum >= getSequenceNumber(oplog.get())) {
+ iterator.remove();
+ omittedHoplogs.add(oplog);
+ if (logger.isDebugEnabled()){
+ fineLog("Overlap with MajorC, excluding hoplog " + oplog.get());
+ }
+ continue;
+ }
+
+ if (oplog.get().getSize() > MAX_COMPACTION_FILE_SIZE || oplog.get().getFileName().endsWith(MAJOR_HOPLOG_EXTENSION)) {
+ // big file will not be included for minor compaction
+ // major compacted file will not be converted to minor compacted file
+ iterator.remove();
+ omittedHoplogs.add(oplog);
+ if (logger.isDebugEnabled()) {
+ fineLog("Excluding big hoplog from minor cycle:",
+ oplog.get(), " size:", oplog.get().getSize(), " limit:",
+ MAX_COMPACTION_FILE_SIZE);
+ }
+ } else {
+ // first small hoplog found, skip the loop
+ break;
+ }
+ }
+
+ // If there are too few files no need to perform compaction
+ if (targets.size() < MIN_FILE_COUNT_COMPACTION) {
+ if (logger.isDebugEnabled()){
+ logger.debug("{}Too few hoplogs for minor cycle:" + targets.size(), logPrefix);
+ }
+ omittedHoplogs.addAll(targets);
+ targets.clear();
+ return;
+ }
+
+ float maxGain = Float.MIN_VALUE;
+ int bestFrom = -1;
+ int bestTo = -1;
+
+ // for listSize=5 list, minFile=3; maxIndex=5-3.
+ // so from takes values 0,1,2
+ int maxIndexForFrom = targets.size() - MIN_FILE_COUNT_COMPACTION;
+ for (int from = 0; from <= maxIndexForFrom ; from++) {
+ // for listSize=6 list, minFile=3, maxFile=5; minTo=0+3-1, maxTo=0+5-1
+ // so to takes values 2,3,4
+ int minIndexForTo = from + MIN_FILE_COUNT_COMPACTION - 1;
+ int maxIndexForTo = Math.min(from + MAX_FILE_COUNT_COMPACTION, targets.size());
+
+ for (int i = minIndexForTo; i < maxIndexForTo; i++) {
+ Float gain = computeGain(from, i, targets);
+ if (gain == null) {
+ continue;
+ }
+
+ if (gain > maxGain) {
+ maxGain = gain;
+ bestFrom = from;
+ bestTo = i;
+ }
+ }
+ }
+
+ if (bestFrom == -1) {
+ if (logger.isDebugEnabled())
+ logger.debug("{}Failed to find optimal target set for MinorC", logPrefix);
+ omittedHoplogs.addAll(targets);
+ targets.clear();
+ return;
+ }
+
+ if (logger.isDebugEnabled()) {
+ fineLog("MinorCTarget optimal result from:", bestFrom, " to:", bestTo);
+ }
+
+ // remove hoplogs they do not fall in the bestFrom-bestTo range
+ int i = 0;
+ for (Iterator<TrackedReference<Hoplog>> iter = targets.iterator(); iter.hasNext();) {
+ TrackedReference<Hoplog> hop = iter.next();
+ if (i < bestFrom || i > bestTo) {
+ iter.remove();
+ omittedHoplogs.add(hop);
+ }
+ i++;
+ }
+ } finally {
+ // release readers of targets not included in the compaction cycle
+ String user = logger.isDebugEnabled() ? "MinorC" : null;
+ hoplogReadersController.releaseHoplogs(omittedHoplogs, user);
+ }
+
+ // restore the order, youngest file is the first file again
+ Collections.reverse(targets);
+ }
+
+ @Override
+ public HDFSStore getHdfsStore() {
+ return store;
+ }
+ }
+
+ Float computeGain(int from, int to, List<TrackedReference<Hoplog>> targets) {
+ double SIZE_64K = 64.0 * 1024;
+ // TODO the base for log should depend on the average number of keys a index block will contain
+ double LOG_BASE = Math.log(AVG_NUM_KEYS_PER_INDEX_BLOCK);
+
+ long totalSize = 0;
+ double costBefore = 0f;
+ for (int i = from; i <= to; i++) {
+ long size = targets.get(i).get().getSize();
+ if (size == 0) {
+ continue;
+ }
+ totalSize += size;
+
+ // For each hoplog file, read cost is number of index block reads and 1
+ // data block read. Index blocks on an average contain N keys and are
+ // organized in a N-ary tree structure. Hence the number of index block
+ // reads will be logBaseN(number of data blocks)
+ costBefore += Math.ceil(Math.max(1.0, Math.log(size / SIZE_64K) / LOG_BASE)) + 1;
+ }
+
+ // if the first file is relatively too large this set is bad for compaction
+ long firstFileSize = targets.get(from).get().getSize();
+ if (firstFileSize > (totalSize - firstFileSize) * RATIO) {
+ if (logger.isDebugEnabled()){
+ fineLog("First file too big:", firstFileSize, " totalSize:", totalSize);
+ }
+ return null;
+ }
+
+ // compute size in mb so that the value of gain is in few decimals
+ long totalSizeInMb = totalSize / 1024 / 1024;
+ if (totalSizeInMb == 0) {
+ // the files are tooooo small, just return the count. The more we compact
+ // the better it is
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Total size too small:" +totalSize, logPrefix);
+ }
+ return (float) costBefore;
+ }
+
+ double costAfter = Math.ceil(Math.log(totalSize / SIZE_64K) / LOG_BASE) + 1;
+ return (float) ((costBefore - costAfter) / totalSizeInMb);
+ }
+
+ /**
+ * Hoplog readers are accessed asynchronously. There could be a window in
+ * which, while a hoplog is being iterated on, it gets compacted and becomes
+ * expired or inactive. The reader of the hoplog must not be closed till the
+ * iterator completes. All such scenarios will be managed by this class. It
+ * will keep all the reader, active and inactive, and reference counter to the
+ * readers. An inactive reader will be closed if the reference count goes down
+ * to 0.
+ *
+ * One important point, only compaction process makes a hoplog inactive.
+ * Compaction process in a bucket is single threaded. So compaction itself
+ * will not face race condition. Read and scan operations on the bucket will
+ * be affected. So reference counter is incremented for each read and scan.
+ *
+ * @author ashvina
+ */
+ private class HoplogReadersController implements HoplogReaderActivityListener {
+ private Integer maxOpenFilesLimit;
+
+ // sorted collection of all the active oplog files associated with this bucket. Instead of a
+ // queue, a set is used. New files created as part of compaction may be inserted after a few
+ // hoplogs were created. The compacted file is such a case but should not be treated newest.
+ private final ConcurrentSkipListSet<TrackedReference<Hoplog>> hoplogs;
+
+ // list of all the hoplogs that have been compacted and need to be closed
+ // once the reference count reduces to 0
+ private final ConcurrentHashSet<TrackedReference<Hoplog>> inactiveHoplogs;
+
+ // ReadWriteLock on list of oplogs to allow for consistent reads and scans
+ // while hoplog set changes. A write lock is needed on completion of
+ // compaction, addition of a new hoplog or on receiving updates message from
+ // other GF nodes
+ private final ReadWriteLock hoplogRWLock = new ReentrantReadWriteLock(true);
+
+ // tracks the number of active readers for hoplogs of this bucket
+ private AtomicInteger activeReaderCount = new AtomicInteger(0);
+
+ public HoplogReadersController() {
+ HoplogComparator comp = new HoplogComparator();
+ hoplogs = new ConcurrentSkipListSet<TrackedReference<Hoplog>>(comp) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean add(TrackedReference<Hoplog> e) {
+ // increment number of hoplogs active for this bucket
+ boolean result = super.add(e);
+ if (result)
+ stats.incActiveFiles(1);
+ return result;
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ // decrement the number of hoplogs active for this bucket
+ boolean result = super.remove(o);
+ if (result)
+ stats.incActiveFiles(-1);
+ return result;
+ }
+ };
+
+ inactiveHoplogs = new ConcurrentHashSet<TrackedReference<Hoplog>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean add(TrackedReference<Hoplog> e) {
+ boolean result = super.add(e);
+ if (result)
+ stats.incInactiveFiles(1);
+ return result;
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ boolean result = super.remove(o);
+ if (result)
+ stats.incInactiveFiles(-1);
+ return result;
+ }
+ };
+
+ maxOpenFilesLimit = Integer.getInteger(
+ HoplogConfig.BUCKET_MAX_OPEN_HFILES_CONF,
+ HoplogConfig.BUCKET_MAX_OPEN_HFILES_DEFAULT);
+ }
+
+ Hoplog getOldestHoplog() {
+ if (hoplogs.isEmpty()) {
+ return null;
+ }
+ return hoplogs.last().get();
+ }
+
+ /**
+ * locks sorted oplogs collection and performs add operation
+ * @return if addition was successful
+ */
+ private boolean addSortedOplog(Hoplog so) throws IOException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Try add " + so, logPrefix);
+ }
+ hoplogRWLock.writeLock().lock();
+ try {
+ int size = hoplogs.size();
+ boolean result = hoplogs.add(new TrackedReference<Hoplog>(so));
+ so.setReaderActivityListener(this);
+ if (logger.isDebugEnabled()){
+ fineLog("Added: ", so, " Before:", size, " After:", hoplogs.size());
+ }
+ return result;
+ } finally {
+ hoplogRWLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * locks sorted oplogs collection and performs remove operation and updates readers also
+ */
+ private void removeSortedOplog(TrackedReference<Hoplog> so) throws IOException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Try remove " + so, logPrefix);
+ }
+ hoplogRWLock.writeLock().lock();
+ try {
+ int size = hoplogs.size();
+ boolean result = hoplogs.remove(so);
+ if (result) {
+ inactiveHoplogs.add(so);
+ if (logger.isDebugEnabled()) {
+ fineLog("Removed: ", so, " Before:", size, " After:", hoplogs.size());
+ }
+ } else {
+ if (inactiveHoplogs.contains(so)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Found a missing active hoplog in inactive list." + so, logPrefix);
+ }
+ } else {
+ so.get().close();
+ logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_MISSING_IN_BUCKET_FORCED_CLOSED, so.get()));
+ }
+ }
+ } finally {
+ hoplogRWLock.writeLock().unlock();
+ }
+ }
+
+ private void closeInactiveHoplogs() throws IOException {
+ hoplogRWLock.writeLock().lock();
+ try {
+ for (TrackedReference<Hoplog> hoplog : inactiveHoplogs) {
+ if (logger.isDebugEnabled()){
+ logger.debug("{}Try close inactive " + hoplog, logPrefix);
+ }
+
+ if (!hoplog.inUse()) {
+ int size = inactiveHoplogs.size();
+ inactiveHoplogs.remove(hoplog);
+ closeReaderAndSuppressError(hoplog.get(), true);
+ if (logger.isDebugEnabled()){
+ fineLog("Closed inactive: ", hoplog.get(), " Before:", size,
+ " After:", inactiveHoplogs.size());
+ }
+ }
+ }
+ } finally {
+ hoplogRWLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * @param target
+ * name of the hoplog file
+ * @return trackedReference if target exists in inactive hoplog list.
+ * @throws IOException
+ */
+ TrackedReference<Hoplog> getInactiveHoplog(String target) throws IOException {
+ hoplogRWLock.writeLock().lock();
+ try {
+ for (TrackedReference<Hoplog> hoplog : inactiveHoplogs) {
+ if (hoplog.get().getFileName().equals(target)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Target found in inactive hoplogs list: " + hoplog, logPrefix);
+ }
+ return hoplog;
+ }
+ }
+ if (logger.isDebugEnabled()){
+ logger.debug("{}Target not found in inactive hoplogs list: " + target, logPrefix);
+ }
+ return null;
+ } finally {
+ hoplogRWLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * force closes all readers
+ */
+ public void close() throws IOException {
+ hoplogRWLock.writeLock().lock();
+ try {
+ for (TrackedReference<Hoplog> hoplog : hoplogs) {
+ closeReaderAndSuppressError(hoplog.get(), true);
+ }
+
+ for (TrackedReference<Hoplog> hoplog : inactiveHoplogs) {
+ closeReaderAndSuppressError(hoplog.get(), true);
+ }
+ } finally {
+ hoplogs.clear();
+ inactiveHoplogs.clear();
+ hoplogRWLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * locks hoplogs to create a snapshot of active hoplogs. reference of each
+ * reader is incremented to keep it from getting closed
+ *
+ * @return ordered list of sorted oplogs
+ */
+ private List<TrackedReference<Hoplog>> getTrackedSortedOplogList(String user) {
+ List<TrackedReference<Hoplog>> oplogs = new ArrayList<TrackedReference<Hoplog>>();
+ hoplogRWLock.readLock().lock();
+ try {
+ for (TrackedReference<Hoplog> oplog : hoplogs) {
+ oplog.increment(user);
+ oplogs.add(oplog);
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Track ref " + oplog, logPrefix);
+ }
+ }
+ } finally {
+ hoplogRWLock.readLock().unlock();
+ }
+ return oplogs;
+ }
+
+ private TrackedReference<Hoplog> trackHoplog(Hoplog hoplog, String user) {
+ hoplogRWLock.readLock().lock();
+ try {
+ for (TrackedReference<Hoplog> oplog : hoplogs) {
+ if (oplog.get().getFileName().equals(hoplog.getFileName())) {
+ oplog.increment(user);
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Track " + oplog, logPrefix);
+ }
+ return oplog;
+ }
+ }
+ } finally {
+ hoplogRWLock.readLock().unlock();
+ }
+ throw new NoSuchElementException(hoplog.getFileName());
+ }
+
+ public void releaseHoplogs(List<TrackedReference<Hoplog>> targets, String user) {
+ if (targets == null) {
+ return;
+ }
+
+ for (int i = targets.size() - 1; i >= 0; i--) {
+ TrackedReference<Hoplog> hoplog = targets.get(i);
+ releaseHoplog(hoplog, user);
+ }
+ }
+
+ public void releaseHoplog(TrackedReference<Hoplog> target, String user) {
+ if (target == null) {
+ return;
+ }
+
+ target.decrement(user);
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Try release " + target, logPrefix);
+ }
+ if (target.inUse()) {
+ return;
+ }
+
+ // there are no users of this hoplog. if it is inactive close it.
+ hoplogRWLock.writeLock().lock();
+ try {
+ if (!target.inUse()) {
+ if (inactiveHoplogs.contains(target) ) {
+ int sizeBefore = inactiveHoplogs.size();
+ inactiveHoplogs.remove(target);
+ closeReaderAndSuppressError(target.get(), true);
+ if (logger.isDebugEnabled()) {
+ fineLog("Closed inactive: ", target, " totalBefore:", sizeBefore,
+ " totalAfter:", inactiveHoplogs.size());
+ }
+ } else if (hoplogs.contains(target)) {
+ closeExcessReaders();
+ }
+ }
+ } catch (IOException e) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_IO_ERROR,
+ "Close reader: " + target.get().getFileName()), e);
+ } finally {
+ hoplogRWLock.writeLock().unlock();
+ }
+ }
+
+ /*
+ * detects if the total number of open hdfs readers is more than configured
+ * max file limit. In case the limit is exceeded, some readers need to be
+ * closed to avoid dadanode receiver overflow error.
+ */
+ private void closeExcessReaders() throws IOException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Close excess readers. Size:" + hoplogs.size()
+ + " activeReaders:" + activeReaderCount.get() + " limit:"
+ + maxOpenFilesLimit, logPrefix);
+ }
+
+ if (hoplogs.size() <= maxOpenFilesLimit) {
+ return;
+ }
+
+ if (activeReaderCount.get() <= maxOpenFilesLimit) {
+ return;
+ }
+
+ for (TrackedReference<Hoplog> hoplog : hoplogs.descendingSet()) {
+ if (!hoplog.inUse() && !hoplog.get().isClosed()) {
+ hoplog.get().close(false);
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Excess reader closed " + hoplog, logPrefix);
+ }
+ }
+
+ if (activeReaderCount.get() <= maxOpenFilesLimit) {
+ return;
+ }
+ }
+ }
+
+ @Override
+ public void readerCreated() {
+ activeReaderCount.incrementAndGet();
+ stats.incActiveReaders(1);
+ if (logger.isDebugEnabled())
+ logger.debug("{}ActiveReader++", logPrefix);
+ }
+
+ @Override
+ public void readerClosed() {
+ activeReaderCount.decrementAndGet();
+ stats.incActiveReaders(-1);
+ if (logger.isDebugEnabled())
+ logger.debug("{}ActiveReader--", logPrefix);
+ }
+ }
+
+ /**
+ * returns an ordered list of oplogs, FOR TESTING ONLY
+ */
+ public List<TrackedReference<Hoplog>> getSortedOplogs() throws IOException {
+ List<TrackedReference<Hoplog>> oplogs = new ArrayList<TrackedReference<Hoplog>>();
+ for (TrackedReference<Hoplog> oplog : hoplogReadersController.hoplogs) {
+ oplogs.add(oplog);
+ }
+ return oplogs;
+ }
+
+ /**
+ * Merged iterator on a list of hoplogs.
+ */
+ public class BucketIterator implements HoplogIterator<byte[], SortedHoplogPersistedEvent> {
+ // list of hoplogs to be iterated on.
+ final List<TrackedReference<Hoplog>> hoplogList;
+ HoplogSetIterator mergedIter;
+
+ public BucketIterator(List<TrackedReference<Hoplog>> hoplogs) throws IOException {
+ this.hoplogList = hoplogs;
+ try {
+ mergedIter = new HoplogSetIterator(this.hoplogList);
+ if (logger.isDebugEnabled()) {
+ for (TrackedReference<Hoplog> hoplog : hoplogs) {
+ logger.debug("{}BucketIter target hop:" + hoplog.get().getFileName(), logPrefix);
+ }
+ }
+ } catch (IllegalArgumentException e) {
+ if (IOException.class.isAssignableFrom(e.getCause().getClass())) {
+ throw handleIOError((IOException) e.getCause());
+ } else {
+ throw e;
+ }
+ } catch (IOException e) {
+ throw handleIOError(e);
+ } catch (HDFSIOException e) {
+ throw handleIOError(e);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return mergedIter.hasNext();
+ }
+
+ @Override
+ public byte[] next() throws IOException {
+ try {
+ return HFileSortedOplog.byteBufferToArray(mergedIter.next());
+ } catch (IllegalArgumentException e) {
+ if (IOException.class.isAssignableFrom(e.getCause().getClass())) {
+ throw handleIOError((IOException) e.getCause());
+ } else {
+ throw e;
+ }
+ } catch (IOException e) {
+ throw handleIOError(e);
+ }
+ }
+
+ @Override
+ public byte[] getKey() {
+ // merged iterator returns a byte[]. This needs to be deserialized to the object which was
+ // provided during flush operation
+ return HFileSortedOplog.byteBufferToArray(mergedIter.getKey());
+ }
+
+ @Override
+ public SortedHoplogPersistedEvent getValue() {
+ // merged iterator returns a byte[]. This needs to be deserialized to the
+ // object which was provided during flush operation
+ try {
+ return deserializeValue(HFileSortedOplog.byteBufferToArray(mergedIter.getValue()));
+ } catch (IOException e) {
+ throw new HDFSIOException("Failed to deserialize byte while iterating on partition", e);
+ }
+ }
+
+ @Override
+ public void remove() {
+ mergedIter.remove();
+ }
+
+ @Override
+ public void close() {
+ // TODO release the closed iterators early
+ String user = logger.isDebugEnabled() ? "Scan" : null;
+ hoplogReadersController.releaseHoplogs(hoplogList, user);
+ }
+ }
+
+ /**
+ * This utility class is used to filter temporary hoplogs in a bucket
+ * directory
+ *
+ * @author ashvina
+ */
+ private static class TmpFilePathFilter implements PathFilter {
+ @Override
+ public boolean accept(Path path) {
+ Matcher matcher = HOPLOG_NAME_PATTERN.matcher(path.getName());
+ if (matcher.matches() && path.getName().endsWith(TEMP_HOPLOG_EXTENSION)) {
+ return true;
+ }
+ return false;
+ }
+ }
+
+ private void fineLog(Object... strings) {
+ if (logger.isDebugEnabled()) {
+ StringBuffer sb = concatString(strings);
+ logger.debug(logPrefix + sb.toString());
+ }
+ }
+
+ private StringBuffer concatString(Object... strings) {
+ StringBuffer sb = new StringBuffer();
+ for (Object str : strings) {
+ sb.append(str.toString());
+ }
+ return sb;
+ }
+
+ @Override
+ public void compactionCompleted(String region, int bucket, boolean isMajor) {
+ // do nothing for compaction events. Hoplog Organizer depends on addition
+ // and deletion of hoplogs only
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/Hoplog.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/Hoplog.java
index 113e49b,0000000..a5030ae
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/Hoplog.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/Hoplog.java
@@@ -1,263 -1,0 +1,264 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
++import com.gemstone.gemfire.internal.hll.ICardinality;
++
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumMap;
+
- import com.gemstone.gemfire.cache.hdfs.internal.cardinality.ICardinality;
+
+/**
+ * Ordered sequence file
+ */
+public interface Hoplog extends Closeable, Comparable<Hoplog> {
+ public static final boolean NOP_WRITE = Boolean.getBoolean("Hoplog.NOP_WRITE");
+
+ /** the gemfire magic number for sorted oplogs */
+ public static final byte[] MAGIC = new byte[] { 0x47, 0x53, 0x4F, 0x50 };
+
+ /**
+ * @return an instance of cached reader, creates one if does not exist
+ * @throws IOException
+ */
+ HoplogReader getReader() throws IOException;
+
+ /**
+ * Creates a new sorted writer.
+ *
+ * @param keys
+ * an estimate of the number of keys to be written
+ * @return the writer
+ * @throws IOException
+ * error creating writer
+ */
+ HoplogWriter createWriter(int keys) throws IOException;
+
+ /**
+ * @param listener listener of reader's activity
+ */
+ void setReaderActivityListener(HoplogReaderActivityListener listener);
+
+ /**
+ * @return file name
+ */
+ String getFileName();
+
+ /**
+ * @return Entry count estimate for this hoplog
+ */
+ public ICardinality getEntryCountEstimate() throws IOException;
+
+ /**
+ * renames the file to the input name
+ *
+ * @throws IOException
+ */
+ void rename(String name) throws IOException;
+
+ /**
+ * Deletes the sorted oplog file
+ */
+ void delete() throws IOException;
+
+ /**
+ * Returns true if the hoplog is closed for reads.
+ * @return true if closed
+ */
+ boolean isClosed();
+
+ /**
+ * @param clearCache clear this sorted oplog's cache if true
+ * @throws IOException
+ */
+ void close(boolean clearCache) throws IOException;
+
+ /**
+ * @return the modification timestamp of the file
+ */
+ long getModificationTimeStamp();
+
+ /**
+ * @return the size of file
+ */
+ long getSize();
+
+ /**
+ * Reads sorted oplog file.
+ */
+ public interface HoplogReader extends HoplogSetReader<byte[], byte[]> {
+ /**
+ * Returns a byte buffer based view of the value linked to the key
+ */
+ ByteBuffer get(byte[] key) throws IOException;
+
+ /**
+ * @return Returns the bloom filter associated with this sorted oplog file.
+ */
+ BloomFilter getBloomFilter() throws IOException;
+
+ /**
+ * @return number of KV pairs in the file, including tombstone entries
+ */
+ long getEntryCount();
+
+ /**
+ * Returns the {@link ICardinality} implementation that is useful for
+ * estimating the size of this Hoplog.
+ *
+ * @return the cardinality estimator
+ */
+ ICardinality getCardinalityEstimator();
+ }
+
+ /**
+ * Provides hoplog's reader's activity related events to owners
+ *
+ * @author ashvina
+ */
+ public interface HoplogReaderActivityListener {
+ /**
+ * Invoked when a reader is created and an active reader did not exist
+ * earlier
+ */
+ public void readerCreated();
+
+ /**
+ * Invoked when an active reader is closed
+ */
+ public void readerClosed();
+ }
+
+ /**
+ * Writes key/value pairs in a sorted oplog file. Each entry that is appended must have a key that
+ * is greater than or equal to the previous key.
+ */
+ public interface HoplogWriter extends Closeable {
+ /**
+ * Appends another key and value. The key is expected to be greater than or equal to the last
+ * key that was appended.
+ * @param key
+ * @param value
+ */
+ void append(byte[] key, byte[] value) throws IOException;
+
+ /**
+ * Appends another key and value. The key is expected to be greater than or equal to the last
+ * key that was appended.
+ */
+ void append(ByteBuffer key, ByteBuffer value) throws IOException;
+
+ void close(EnumMap<Meta, byte[]> metadata) throws IOException;
+
+ /**
+ * flushes all outstanding data into the OS buffers on all DN replicas
+ * @throws IOException
+ */
+ void hsync() throws IOException;
+
+ /**
+ * Gets the size of the data that has already been written
+ * to the writer.
+ *
+ * @return number of bytes already written to the writer
+ */
+ public long getCurrentSize() throws IOException;
+ }
+
+ /**
+ * Identifies the gemfire sorted oplog versions.
+ */
+ public enum HoplogVersion {
+ V1;
+
+ /**
+ * Returns the version string as bytes.
+ *
+ * @return the byte form
+ */
+ public byte[] toBytes() {
+ return name().getBytes();
+ }
+
+ /**
+ * Constructs the version from a byte array.
+ *
+ * @param version
+ * the byte form of the version
+ * @return the version enum
+ */
+ public static HoplogVersion fromBytes(byte[] version) {
+ return HoplogVersion.valueOf(new String(version));
+ }
+ }
+
+ /**
+ * Names the available metadata keys that will be stored in the sorted oplog.
+ */
+ public enum Meta {
+ /** identifies the soplog as a gemfire file, required */
+ GEMFIRE_MAGIC,
+
+ /** identifies the soplog version, required */
+ SORTED_OPLOG_VERSION,
+
+ /** identifies the gemfire version the soplog was created with */
+ GEMFIRE_VERSION,
+
+ /** identifies the statistics data */
+ STATISTICS,
+
+ /** identifies the embedded comparator types */
+ COMPARATORS,
+
+ /** identifies the pdx type data, optional */
+ PDX,
+
+ /**
+ * identifies the hyperLogLog byte[] which estimates the cardinality for
+ * only one hoplog
+ */
+ LOCAL_CARDINALITY_ESTIMATE,
+
+ /**
+ * represents the hyperLogLog byte[] after upgrading the constant from
+ * 0.1 to 0.03 (in gfxd 1.4)
+ */
+ LOCAL_CARDINALITY_ESTIMATE_V2
+ ;
+
+ /**
+ * Converts the metadata name to bytes.
+ */
+ public byte[] toBytes() {
+ return ("gemfire." + name()).getBytes();
+ }
+
+ /**
+ * Converts the byte form of the name to an enum.
+ *
+ * @param key
+ * the key as bytes
+ * @return the enum form
+ */
+ public static Meta fromBytes(byte[] key) {
+ return Meta.valueOf(new String(key).substring("gemfire.".length()));
+ }
+ }
+}