You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ag...@apache.org on 2016/05/03 23:45:24 UTC
[10/60] [abbrv] incubator-geode git commit: GEODE-1072: Removing HDFS
related code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizer.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizer.java
deleted file mode 100644
index e8abb38..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizer.java
+++ /dev/null
@@ -1,2004 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import com.gemstone.gemfire.internal.hll.CardinalityMergeException;
-import com.gemstone.gemfire.internal.hll.HyperLogLog;
-import com.gemstone.gemfire.internal.hll.ICardinality;
-import com.gemstone.gemfire.internal.hll.MurmurHash;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.util.ShutdownHookManager;
-
-import com.gemstone.gemfire.InternalGemFireException;
-import com.gemstone.gemfire.cache.CacheClosedException;
-import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
-import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSCompactionManager.CompactionRequest;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogReader;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogReaderActivityListener;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.Meta;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil;
-import com.gemstone.gemfire.internal.HeapDataOutputStream;
-import com.gemstone.gemfire.internal.cache.ForceReattemptException;
-import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
-import com.gemstone.gemfire.internal.cache.execute.BucketMovedException;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics.IOOperation;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
-import com.gemstone.gemfire.internal.concurrent.ConcurrentHashSet;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-import org.apache.hadoop.hbase.util.FSUtils;
-
-/**
- * Manages sorted oplog files for a bucket. An instance per bucket will exist in
- * each PR
- *
- */
-public class HdfsSortedOplogOrganizer extends AbstractHoplogOrganizer<SortedHoplogPersistedEvent> {
- public static final int AVG_NUM_KEYS_PER_INDEX_BLOCK = 200;
-
- // all valid sorted hoplogs will follow the following name pattern
- public static final String SORTED_HOPLOG_REGEX = HOPLOG_NAME_REGEX + "("
- + FLUSH_HOPLOG_EXTENSION + "|" + MINOR_HOPLOG_EXTENSION + "|"
- + MAJOR_HOPLOG_EXTENSION + ")";
- public static final Pattern SORTED_HOPLOG_PATTERN = Pattern.compile(SORTED_HOPLOG_REGEX);
-
- //Amount of time before deleting old temporary files
- final long TMP_FILE_EXPIRATION_TIME_MS = Long.getLong(HoplogConfig.TMP_FILE_EXPIRATION, HoplogConfig.TMP_FILE_EXPIRATION_DEFAULT);
-
- static float RATIO = HoplogConfig.COMPACTION_FILE_RATIO_DEFAULT;
-
- // Compacter for this bucket
- private Compactor compacter;
-
- private final HoplogReadersController hoplogReadersController;
- private AtomicLong previousCleanupTimestamp = new AtomicLong(Long.MIN_VALUE);
-
- /**
- * The default HLL constant. gives an accuracy of about 3.25%
- * public only for testing upgrade from 1.3 to 1.4
- */
- public static double HLL_CONSTANT = 0.03;
- /**
- * This estimator keeps track of this buckets entry count. This value is
- * affected by flush and compaction cycles
- */
- private volatile ICardinality bucketSize = new HyperLogLog(HLL_CONSTANT);
- //A set of tmp files that existed when this bucket organizer was originally
- //created. These may still be open by the old primary, or they may be
- //abandoned files.
- private LinkedList<FileStatus> oldTmpFiles;
-
- private ConcurrentMap<Hoplog, Boolean> tmpFiles = new ConcurrentHashMap<Hoplog, Boolean>();
-
- protected volatile boolean organizerClosed = false;
-
- /**
- * For the 1.4 release we are changing the HLL_CONSTANT which will make the
- * old persisted HLLs incompatible with the new HLLs. To fix this we will
- * force a major compaction when the system starts up so that we will only
- * have new HLLs in the system (see bug 51403)
- */
- private boolean startCompactionOnStartup = false;
-
- /**
- * @param region
- * Region manager instance. Instances of hdfs listener instance,
- * stats collector, file system, etc are shared by all buckets of a
- * region and provided by region manager instance
- * @param bucketId bucket id to be managed by this organizer
- * @throws IOException
- */
- public HdfsSortedOplogOrganizer(HdfsRegionManager region, int bucketId) throws IOException{
- super(region, bucketId);
-
- String val = System.getProperty(HoplogConfig.COMPACTION_FILE_RATIO);
- try {
- RATIO = Float.parseFloat(val);
- } catch (Exception e) {
- }
-
- hoplogReadersController = new HoplogReadersController();
-
- // initialize with all the files in the directory
- List<Hoplog> hoplogs = identifyAndLoadSortedOplogs(true);
- if (logger.isDebugEnabled()) {
- logger.debug("{}Initializing bucket with existing hoplogs, count = " + hoplogs.size(), logPrefix);
- }
- for (Hoplog hoplog : hoplogs) {
- addSortedOplog(hoplog, false, true);
- }
-
- // initialize sequence to the current maximum
- sequence = new AtomicInteger(findMaxSequenceNumber(hoplogs));
-
- initOldTmpFiles();
-
- FileSystem fs = store.getFileSystem();
- Path cleanUpIntervalPath = new Path(store.getHomeDir(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME);
- if (!fs.exists(cleanUpIntervalPath)) {
- long intervalDurationMillis = store.getPurgeInterval() * 60 * 1000;
- HoplogUtil.exposeCleanupIntervalMillis(fs, cleanUpIntervalPath, intervalDurationMillis);
- }
-
- if (startCompactionOnStartup) {
- forceCompactionOnVersionUpgrade();
- if (logger.isInfoEnabled()) {
- logger.info(LocalizedStrings.HOPLOG_MAJOR_COMPACTION_SCHEDULED_FOR_BETTER_ESTIMATE);
- }
- }
- }
-
- /**
- * Iterates on the input buffer and persists it in a new sorted oplog. This operation is
- * synchronous and blocks the thread.
- */
- @Override
- public void flush(Iterator<? extends QueuedPersistentEvent> iterator, final int count)
- throws IOException, ForceReattemptException {
- assert iterator != null;
-
- if (logger.isDebugEnabled())
- logger.debug("{}Initializing flush operation", logPrefix);
-
- final Hoplog so = getTmpSortedOplog(null, FLUSH_HOPLOG_EXTENSION);
- HoplogWriter writer = null;
- ICardinality localHLL = new HyperLogLog(HLL_CONSTANT);
-
- // variables for updating stats
- long start = stats.getFlush().begin();
- int byteCount = 0;
-
- try {
- /**MergeGemXDHDFSToGFE changed the following statement as the code of HeapDataOutputStream is not merged */
- //HeapDataOutputStream out = new HeapDataOutputStream();
-
- try {
- writer = this.store.getSingletonWriter().runSerially(new Callable<Hoplog.HoplogWriter>() {
- @Override
- public HoplogWriter call() throws Exception {
- return so.createWriter(count);
- }
- });
- } catch (Exception e) {
- if (e instanceof IOException) {
- throw (IOException)e;
- }
- throw new IOException(e);
- }
-
- while (iterator.hasNext() && !this.organizerClosed) {
- HeapDataOutputStream out = new HeapDataOutputStream(1024, null);
-
- QueuedPersistentEvent item = iterator.next();
- item.toHoplogEventBytes(out);
- byte[] valueBytes = out.toByteArray();
- writer.append(item.getRawKey(), valueBytes);
-
- // add key length and value length to stats byte counter
- byteCount += (item.getRawKey().length + valueBytes.length);
-
- // increment size only if entry is not deleted
- if (!isDeletedEntry(valueBytes, 0)) {
- int hash = MurmurHash.hash(item.getRawKey());
- localHLL.offerHashed(hash);
- }
- /**MergeGemXDHDFSToGFE how to clear for reuse. Leaving it for Darrel to merge this change*/
- //out.clearForReuse();
- }
- if (organizerClosed)
- throw new BucketMovedException("The current bucket is moved BucketID: "+
- this.bucketId + " Region name: " + this.regionManager.getRegion().getName());
-
- // append completed. provide cardinality and close writer
- writer.close(buildMetaData(localHLL));
- writer = null;
- } catch (IOException e) {
- stats.getFlush().error(start);
- try {
- e = handleWriteHdfsIOError(writer, so, e);
- } finally {
- //Set the writer to null because handleWriteHDFSIOError has
- //already closed the writer.
- writer = null;
- }
- throw e;
- } catch (BucketMovedException e) {
- stats.getFlush().error(start);
- deleteTmpFile(writer, so);
- writer = null;
- throw e;
- } finally {
- if (writer != null) {
- writer.close();
- }
- }
-
- try{
-
- // ping secondaries before making the file a legitimate file to ensure
- // that in case of split brain, no other vm has taken up as primary. #50110.
- pingSecondaries();
-
- // rename file and check if renaming was successful
- synchronized (changePrimarylockObject) {
- if (!organizerClosed)
- makeLegitimate(so);
- else
- throw new BucketMovedException("The current bucket is moved BucketID: "+
- this.bucketId + " Region name: " + this.regionManager.getRegion().getName());
- }
- try {
- so.getSize();
- } catch (IllegalStateException e) {
- throw new IOException("Failed to rename hoplog file:" + so.getFileName());
- }
-
- //Disabling this assertion due to bug 49740
- // check to make sure the sequence number is correct
-// if (ENABLE_INTEGRITY_CHECKS) {
-// Assert.assertTrue(getSequenceNumber(so) == findMaxSequenceNumber(identifyAndLoadSortedOplogs(false)),
-// "Invalid sequence number detected for " + so.getFileName());
-// }
-
- // record the file for future maintenance and reads
- addSortedOplog(so, false, true);
- stats.getFlush().end(byteCount, start);
- incrementDiskUsage(so.getSize());
- } catch (BucketMovedException e) {
- stats.getFlush().error(start);
- deleteTmpFile(writer, so);
- writer = null;
- throw e;
- } catch (IOException e) {
- stats.getFlush().error(start);
- logger.warn(LocalizedStrings.HOPLOG_FLUSH_OPERATION_FAILED, e);
- throw e;
- }
-
- submitCompactionRequests();
- }
-
-
- /**
- * store cardinality information in metadata
- * @param localHLL the hll estimate for this hoplog only
- */
- private EnumMap<Meta, byte[]> buildMetaData(ICardinality localHLL) throws IOException {
- EnumMap<Meta, byte[]> map = new EnumMap<Hoplog.Meta, byte[]>(Meta.class);
- map.put(Meta.LOCAL_CARDINALITY_ESTIMATE_V2, localHLL.getBytes());
- return map;
- }
-
- private void submitCompactionRequests() throws IOException {
- CompactionRequest req;
-
- // determine if a major compaction is needed and create a compaction request
- // with compaction manager
- if (store.getMajorCompaction()) {
- if (isMajorCompactionNeeded()) {
- req = new CompactionRequest(regionFolder, bucketId, getCompactor(), true);
- HDFSCompactionManager.getInstance(store).submitRequest(req);
- }
- }
-
- // submit a minor compaction task. It will be ignored if there is no work to
- // be done.
- if (store.getMinorCompaction()) {
- req = new CompactionRequest(regionFolder, bucketId, getCompactor(), false);
- HDFSCompactionManager.getInstance(store).submitRequest(req);
- }
- }
-
- /**
- * @return true if the oldest hoplog was created 1 major compaction interval ago
- */
- private boolean isMajorCompactionNeeded() throws IOException {
- // major compaction interval in milliseconds
-
- long majorCInterval = ((long)store.getMajorCompactionInterval()) * 60 * 1000;
-
- Hoplog oplog = hoplogReadersController.getOldestHoplog();
- if (oplog == null) {
- return false;
- }
-
- long oldestFileTime = oplog.getModificationTimeStamp();
- long now = System.currentTimeMillis();
- if (logger.isDebugEnabled()) {
- logger.debug("{}Checking oldest hop " + oplog.getFileName()
- + " for majorCompactionInterval=" + majorCInterval
- + " + now=" + now, logPrefix);
- }
- if (oldestFileTime > 0l && oldestFileTime < (now - majorCInterval)) {
- return true;
- }
- return false;
- }
-
- @Override
- public SortedHoplogPersistedEvent read(byte[] key) throws IOException {
- long startTime = stats.getRead().begin();
- String user = logger.isDebugEnabled() ? "Read" : null;
-
- // collect snapshot of hoplogs
- List<TrackedReference<Hoplog>> hoplogs = null;
- hoplogs = hoplogReadersController.getTrackedSortedOplogList(user);
- try {
- // search for the key in order starting with the youngest oplog
- for (TrackedReference<Hoplog> hoplog : hoplogs) {
- HoplogReader reader = hoplog.get().getReader();
- byte[] val = reader.read(key);
- if (val != null) {
- // value found in a younger hoplog. stop iteration
- SortedHoplogPersistedEvent eventObj = deserializeValue(val);
- stats.getRead().end(val.length, startTime);
- return eventObj;
- }
- }
- } catch (IllegalArgumentException e) {
- if (IOException.class.isAssignableFrom(e.getCause().getClass())) {
- throw handleIOError((IOException) e.getCause());
- } else {
- throw e;
- }
- } catch (IOException e) {
- throw handleIOError(e);
- } catch (HDFSIOException e) {
- throw handleIOError(e);
- } finally {
- hoplogReadersController.releaseHoplogs(hoplogs, user);
- }
-
- stats.getRead().end(0, startTime);
- return null;
- }
-
- protected IOException handleIOError(IOException e) {
- // expose the error wrapped inside remote exception
- if (e instanceof RemoteException) {
- return ((RemoteException) e).unwrapRemoteException();
- }
-
- checkForSafeError(e);
-
- // it is not a safe error. let the caller handle it
- return e;
- }
-
- protected HDFSIOException handleIOError(HDFSIOException e) {
- checkForSafeError(e);
- return e;
- }
-
- protected void checkForSafeError(Exception e) {
- boolean safeError = ShutdownHookManager.get().isShutdownInProgress();
- if (safeError) {
- // IOException because of closed file system. This happens when member is
- // shutting down
- if (logger.isDebugEnabled())
- logger.debug("IO error caused by filesystem shutdown", e);
- throw new CacheClosedException("IO error caused by filesystem shutdown", e);
- }
-
- if(isClosed()) {
- //If the hoplog organizer is closed, throw an exception to indicate the
- //caller should retry on the new primary.
- throw new PrimaryBucketException(e);
- }
- }
-
- protected IOException handleWriteHdfsIOError(HoplogWriter writer, Hoplog so, IOException e)
- throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Handle write error:" + so, logPrefix);
- }
-
- closeWriter(writer);
- // add to the janitor queue
- tmpFiles.put(so, Boolean.TRUE);
-
- return handleIOError(e);
- }
-
- private void deleteTmpFile(HoplogWriter writer, Hoplog so) {
- closeWriter(writer);
-
- // delete the temporary hoplog
- try {
- if (so != null) {
- so.delete();
- }
- } catch (IOException e1) {
- logger.info(e1);
- }
- }
-
- private void closeWriter(HoplogWriter writer) {
- if (writer != null) {
- // close writer before deleting it
- try {
- writer.close();
- } catch (Throwable e1) {
- // error to close hoplog will happen if no connections to datanode are
- // available. Try to delete the file on namenode
- if(!isClosed()) {
- logger.info(e1);
- }
- }
- }
- }
-
- /**
- * Closes hoplog and suppresses IO during reader close. Suppressing IO errors
- * when the organizer is closing or an hoplog becomes inactive lets the system
- * continue freeing other resources. It could potentially lead to socket
- * leaks though.
- */
- private void closeReaderAndSuppressError(Hoplog hoplog, boolean clearCache) {
- try {
- hoplog.close();
- } catch (IOException e) {
- // expose the error wrapped inside remote exception
- if (e instanceof RemoteException) {
- e = ((RemoteException) e).unwrapRemoteException();
- }
- logger.info(e);
- }
- }
-
- @Override
- public BucketIterator scan() throws IOException {
- String user = logger.isDebugEnabled() ? "Scan" : null;
- List<TrackedReference<Hoplog>> hoplogs = null;
- BucketIterator iter = null;
- try {
- hoplogs = hoplogReadersController.getTrackedSortedOplogList(user);
- iter = new BucketIterator(hoplogs);
- return iter;
- } finally {
- // Normally the hoplogs will be released when the iterator is closed. The
- // hoplogs must be released only if creating the iterator has failed.
- if (iter == null) {
- hoplogReadersController.releaseHoplogs(hoplogs, user);
- }
- }
- }
-
- @Override
- public BucketIterator scan(byte[] from, byte[] to) throws IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public BucketIterator scan(byte[] from, boolean fromInclusive, byte[] to, boolean toInclusive) throws IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public HoplogIterator<byte[], SortedHoplogPersistedEvent> scan(
- long startOffset, long length) throws IOException {
- throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName());
- }
-
- @Override
- public void close() throws IOException {
- super.close();
-
- synchronized (changePrimarylockObject) {
- organizerClosed = true;
- }
- //Suspend compaction
- getCompactor().suspend();
-
- //Close the readers controller.
- hoplogReadersController.close();
-
- previousCleanupTimestamp.set(Long.MIN_VALUE);
-
- }
-
- /**
- * This method call will happen on secondary node. The secondary node needs to update its data
- * structures
- */
- @Override
- public void hoplogCreated(String region, int bucketId, Hoplog... oplogs)
- throws IOException {
- for (Hoplog oplog : oplogs) {
- addSortedOplog(oplog, false, true);
- }
- }
-
- @Override
- public long sizeEstimate() {
- return this.bucketSize.cardinality();
- }
-
- private void addSortedOplog(Hoplog so, boolean notify, boolean addsToBucketSize)
- throws IOException {
- if (!hoplogReadersController.addSortedOplog(so)) {
- so.close();
- throw new InternalGemFireException("Failed to add " + so);
- }
-
- String user = logger.isDebugEnabled() ? "Add" : null;
- if (addsToBucketSize) {
- TrackedReference<Hoplog> ref = null;
- try {
- ref = hoplogReadersController.trackHoplog(so, user);
- synchronized (bucketSize) {
- ICardinality localHLL = ref.get().getEntryCountEstimate();
- if (localHLL != null) {
- bucketSize = mergeHLL(bucketSize, localHLL);
- }
- }
- } finally {
- if (ref != null) {
- hoplogReadersController.releaseHoplog(ref, user);
- }
- }
- }
-
- if (notify && listener != null) {
- listener.hoplogCreated(regionFolder, bucketId, so);
- }
- }
-
- private void reEstimateBucketSize() throws IOException {
- ICardinality global = null;
- String user = logger.isDebugEnabled() ? "HLL" : null;
- List<TrackedReference<Hoplog>> hoplogs = null;
- try {
- hoplogs = hoplogReadersController.getTrackedSortedOplogList(user);
- global = new HyperLogLog(HLL_CONSTANT);
- for (TrackedReference<Hoplog> hop : hoplogs) {
- global = mergeHLL(global, hop.get().getEntryCountEstimate());
- }
- } finally {
- hoplogReadersController.releaseHoplogs(hoplogs, user);
- }
- bucketSize = global;
- }
-
- protected ICardinality mergeHLL(ICardinality global, ICardinality local)
- /*throws IOException*/ {
- try {
- return global.merge(local);
- } catch (CardinalityMergeException e) {
- // uncomment this after the 1.4 release
- //throw new InternalGemFireException(e.getLocalizedMessage(), e);
- startCompactionOnStartup = true;
- return global;
- }
- }
-
- private void removeSortedOplog(TrackedReference<Hoplog> so, boolean notify) throws IOException {
- hoplogReadersController.removeSortedOplog(so);
-
- // release lock before notifying listeners
- if (notify && listener != null) {
- listener.hoplogDeleted(regionFolder, bucketId, so.get());
- }
- }
-
- private void notifyCompactionListeners(boolean isMajor) {
- listener.compactionCompleted(regionFolder, bucketId, isMajor);
- }
-
- /**
- * This method call will happen on secondary node. The secondary node needs to update its data
- * structures
- * @throws IOException
- */
- @Override
- public void hoplogDeleted(String region, int bucketId, Hoplog... oplogs) throws IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public synchronized Compactor getCompactor() {
- if (compacter == null) {
- compacter = new HoplogCompactor();
- }
- return compacter;
- }
-
- @Override
- protected Hoplog getHoplog(Path hoplogPath) throws IOException {
- Hoplog so = new HFileSortedOplog(store, hoplogPath, store.getBlockCache(), stats, store.getStats());
- return so;
- }
-
- /**
- * locks sorted oplogs collection, removes oplog and renames for deletion later
- * @throws IOException
- */
- void markSortedOplogForDeletion(List<TrackedReference<Hoplog>> targets, boolean notify) throws IOException {
- for (int i = targets.size(); i > 0; i--) {
- TrackedReference<Hoplog> so = targets.get(i - 1);
- removeSortedOplog(so, true);
- if (!store.getFileSystem().exists(new Path(bucketPath, so.get().getFileName()))) {
- // the hoplog does not even exist on file system. Skip remaining steps
- continue;
- }
- addExpiryMarkerForAFile(so.get());
- }
- }
-
- /**
- * Deletes expired hoplogs and expiry markers from the file system. Calculates
- * a target timestamp based on cleanup interval. Then gets list of target
- * hoplogs. It also updates the disk usage state
- *
- * @return number of files deleted
- */
- synchronized int initiateCleanup() throws IOException {
- int conf = store.getPurgeInterval();
- // minutes to milliseconds
- long intervalDurationMillis = conf * 60 * 1000;
- // Any expired hoplog with timestamp less than targetTS is a delete
- // candidate.
- long targetTS = System.currentTimeMillis() - intervalDurationMillis;
- if (logger.isDebugEnabled()) {
- logger.debug("Target timestamp for expired hoplog deletion " + targetTS, logPrefix);
- }
- // avoid too frequent cleanup invocations. Exit cleanup invocation if the
- // previous cleanup was executed within 10% range of cleanup interval
- if (previousCleanupTimestamp.get() > targetTS
- && (previousCleanupTimestamp.get() - targetTS) < (intervalDurationMillis / 10)) {
- if (logger.isDebugEnabled()) {
- logger.debug("Skip cleanup, previous " + previousCleanupTimestamp.get(), logPrefix);
- }
- return 0;
- }
-
- List<FileStatus> targets = getOptimizationTargets(targetTS);
- return deleteExpiredFiles(targets);
- }
-
- protected int deleteExpiredFiles(List<FileStatus> targets) throws IOException {
- if (targets == null) {
- return 0;
- }
-
- for (FileStatus file : targets) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Deleting file: " + file.getPath(), logPrefix);
- }
- store.getFileSystem().delete(file.getPath(), false);
-
- if (isClosed()) {
- if (logger.isDebugEnabled())
- logger.debug("{}Expiry file cleanup interupted by bucket close", logPrefix);
- return 0;
- }
- incrementDiskUsage(-1 * file.getLen());
- }
-
- previousCleanupTimestamp.set(System.currentTimeMillis());
- return targets.size();
- }
-
- /**
- * @param ts
- * target timestamp
- * @return list of hoplogs, whose expiry markers were created before target
- * timestamp, and the expiry marker itself.
- * @throws IOException
- */
- protected List<FileStatus> getOptimizationTargets(long ts) throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Identifying optimization targets " + ts, logPrefix);
- }
-
- List<FileStatus> deleteTargets = new ArrayList<FileStatus>();
- FileStatus[] markers = getExpiryMarkers();
- if (markers != null) {
- for (FileStatus marker : markers) {
- String name = truncateExpiryExtension(marker.getPath().getName());
- long timestamp = marker.getModificationTime();
-
- // expired minor compacted files are not being used anywhere. These can
- // be removed immediately. All the other expired files should be removed
- // when the files have aged
- boolean isTarget = false;
-
- if (name.endsWith(MINOR_HOPLOG_EXTENSION)) {
- isTarget = true;
- } else if (timestamp < ts && name.endsWith(FLUSH_HOPLOG_EXTENSION)) {
- isTarget = true;
- } else if (timestamp < ts && name.endsWith(MAJOR_HOPLOG_EXTENSION)) {
- long majorCInterval = ((long)store.getMajorCompactionInterval()) * 60 * 1000;
- if (timestamp < (System.currentTimeMillis() - majorCInterval)) {
- isTarget = true;
- }
- }
- if (!isTarget) {
- continue;
- }
-
- // if the file is still being read, do not delete or rename it
- TrackedReference<Hoplog> used = hoplogReadersController.getInactiveHoplog(name);
- if (used != null) {
- if (used.inUse() && logger.isDebugEnabled()) {
- logger.debug("{}Optimizer: found active expired hoplog:" + name, logPrefix);
- } else if (logger.isDebugEnabled()) {
- logger.debug("{}Optimizer: found open expired hoplog:" + name, logPrefix);
- }
- continue;
- }
-
- if (logger.isDebugEnabled()) {
- logger.debug("{}Delete target identified " + marker.getPath(), logPrefix);
- }
-
- deleteTargets.add(marker);
- Path hoplogPath = new Path(bucketPath, name);
- if (store.getFileSystem().exists(hoplogPath)) {
- FileStatus hoplog = store.getFileSystem().getFileStatus(hoplogPath);
- deleteTargets.add(hoplog);
- }
- }
- }
- return deleteTargets;
- }
-
- /**
- * Returns a list of of hoplogs present in the bucket's directory, expected to be called during
- * hoplog set initialization
- */
- List<Hoplog> identifyAndLoadSortedOplogs(boolean countSize) throws IOException {
- FileSystem fs = store.getFileSystem();
- if (! fs.exists(bucketPath)) {
- return new ArrayList<Hoplog>();
- }
-
- FileStatus allFiles[] = fs.listStatus(bucketPath);
- ArrayList<FileStatus> validFiles = new ArrayList<FileStatus>();
- for (FileStatus file : allFiles) {
- // All hoplog files contribute to disk usage
- Matcher matcher = HOPLOG_NAME_PATTERN.matcher(file.getPath().getName());
- if (! matcher.matches()) {
- // not a hoplog
- continue;
- }
-
- // account for the disk used by this file
- if (countSize) {
- incrementDiskUsage(file.getLen());
- }
-
- // All valid hoplog files must match the regex
- matcher = SORTED_HOPLOG_PATTERN.matcher(file.getPath().getName());
- if (matcher.matches()) {
- validFiles.add(file);
- }
- }
-
- FileStatus[] markers = getExpiryMarkers();
- FileStatus[] validHoplogs = filterValidHoplogs(
- validFiles.toArray(new FileStatus[validFiles.size()]), markers);
-
- ArrayList<Hoplog> results = new ArrayList<Hoplog>();
- if (validHoplogs == null || validHoplogs.length == 0) {
- return results;
- }
-
- for (int i = 0; i < validHoplogs.length; i++) {
- // Skip directories
- if (validHoplogs[i].isDirectory()) {
- continue;
- }
-
- final Path p = validHoplogs[i].getPath();
- // skip empty file
- if (fs.getFileStatus(p).getLen() <= 0) {
- continue;
- }
-
- Hoplog hoplog = new HFileSortedOplog(store, p, store.getBlockCache(), stats, store.getStats());
- results.add(hoplog);
- }
-
- return results;
- }
-
- private static int findMaxSequenceNumber(List<Hoplog> hoplogs) throws IOException {
- int maxSeq = 0;
- for (Hoplog hoplog : hoplogs) {
- maxSeq = Math.max(maxSeq, getSequenceNumber(hoplog));
- }
- return maxSeq;
- }
-
- /**
- * @return the sequence number associate with a hoplog file
- */
- static int getSequenceNumber(Hoplog hoplog) {
- Matcher matcher = SORTED_HOPLOG_PATTERN.matcher(hoplog.getFileName());
- boolean matched = matcher.find();
- assert matched;
- return Integer.valueOf(matcher.group(3));
- }
-
- protected FileStatus[] getExpiryMarkers() throws IOException {
- FileSystem fs = store.getFileSystem();
- if (hoplogReadersController.hoplogs == null
- || hoplogReadersController.hoplogs.size() == 0) {
- // there are no hoplogs in the system. May be the bucket is not existing
- // at all.
- if (!fs.exists(bucketPath)) {
- if (logger.isDebugEnabled())
- logger.debug("{}This bucket is unused, skipping expired hoplog check", logPrefix);
- return null;
- }
- }
-
- FileStatus files[] = FSUtils.listStatus(fs, bucketPath, new PathFilter() {
- @Override
- public boolean accept(Path file) {
- // All expired hoplog end with expire extension and must match the valid file regex
- String fileName = file.getName();
- if (! fileName.endsWith(EXPIRED_HOPLOG_EXTENSION)) {
- return false;
- }
- fileName = truncateExpiryExtension(fileName);
- Matcher matcher = SORTED_HOPLOG_PATTERN.matcher(fileName);
- return matcher.find();
- }
-
- });
- return files;
- }
-
- @Override
- public void clear() throws IOException {
- //Suspend compaction while we are doing the clear. This
- //aborts the currently in progress compaction.
- getCompactor().suspend();
-
- // while compaction is suspended, clear method marks hoplogs for deletion
- // only. Files will be removed by cleanup thread after active gets and
- // iterations are completed
- String user = logger.isDebugEnabled() ? "clear" : null;
- List<TrackedReference<Hoplog>> oplogs = null;
- try {
- oplogs = hoplogReadersController.getTrackedSortedOplogList(user);
- markSortedOplogForDeletion(oplogs, true);
- } finally {
- if (oplogs != null) {
- hoplogReadersController.releaseHoplogs(oplogs, user);
- }
- //Resume compaction
- getCompactor().resume();
- }
- }
-
- /**
- * Performs the following activities
- * <UL>
- * <LI>Submits compaction requests as needed
- * <LI>Deletes tmp files which the system failed to removed earlier
- */
- @Override
- public void performMaintenance() throws IOException {
- long startTime = System.currentTimeMillis();
-
- if (logger.isDebugEnabled())
- logger.debug("{}Executing bucket maintenance", logPrefix);
-
- submitCompactionRequests();
- hoplogReadersController.closeInactiveHoplogs();
- initiateCleanup();
-
- cleanupTmpFiles();
-
- if (logger.isDebugEnabled()) {
- logger.debug("{}Time spent in bucket maintenance (in ms): "
- + (System.currentTimeMillis() - startTime), logPrefix);
- }
- }
-
- @Override
- public Future<CompactionStatus> forceCompaction(boolean isMajor) {
- CompactionRequest request = new CompactionRequest(regionFolder, bucketId,
- getCompactor(), isMajor, true/*force*/);
- return HDFSCompactionManager.getInstance(store).submitRequest(request);
- }
-
- private Future<CompactionStatus> forceCompactionOnVersionUpgrade() {
- CompactionRequest request = new CompactionRequest(regionFolder, bucketId, getCompactor(), true, true, true);
- return HDFSCompactionManager.getInstance(store).submitRequest(request);
- }
-
- @Override
- public long getLastMajorCompactionTimestamp() {
- long ts = 0;
- String user = logger.isDebugEnabled() ? "StoredProc" : null;
- List<TrackedReference<Hoplog>> hoplogs = hoplogReadersController.getTrackedSortedOplogList(user);
- try {
- for (TrackedReference<Hoplog> hoplog : hoplogs) {
- String fileName = hoplog.get().getFileName();
- Matcher file = HOPLOG_NAME_PATTERN.matcher(fileName);
- if (file.matches() && fileName.endsWith(MAJOR_HOPLOG_EXTENSION)) {
- ts = getHoplogTimestamp(file);
- break;
- }
- }
- } finally {
- hoplogReadersController.releaseHoplogs(hoplogs, user);
- }
- if (logger.isDebugEnabled()) {
- logger.debug("{}HDFS: for bucket:"+getRegionBucketStr()+" returning last major compaction timestamp "+ts, logPrefix);
- }
- return ts;
- }
-
- private void initOldTmpFiles() throws IOException {
- FileSystem fs = store.getFileSystem();
- if (! fs.exists(bucketPath)) {
- return;
- }
-
- oldTmpFiles = new LinkedList<FileStatus>(Arrays.asList(fs.listStatus(bucketPath, new TmpFilePathFilter())));
- }
-
- private void cleanupTmpFiles() throws IOException {
- if(oldTmpFiles == null && tmpFiles == null) {
- return;
- }
-
- if (oldTmpFiles != null) {
- FileSystem fs = store.getFileSystem();
- long now = System.currentTimeMillis();
- for (Iterator<FileStatus> itr = oldTmpFiles.iterator(); itr.hasNext();) {
- FileStatus file = itr.next();
- if(file.getModificationTime() + TMP_FILE_EXPIRATION_TIME_MS > now) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Deleting temporary file:" + file.getPath(), logPrefix);
- }
- fs.delete(file.getPath(), false);
- itr.remove();
- }
- }
- }
- if (tmpFiles != null) {
- for (Hoplog so : tmpFiles.keySet()) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Deleting temporary file:" + so.getFileName(), logPrefix);
- }
- deleteTmpFile(null, so);
- }
- }
- }
-
- /**
- * Executes tiered compaction of hoplog files. One instance of compacter per bucket will exist
- */
- protected class HoplogCompactor implements Compactor {
- private volatile boolean suspend = false;
-
- // the following boolean will be used to synchronize minor compaction
- private AtomicBoolean isMinorCompactionActive = new AtomicBoolean(false);
- // the following boolean will be used to synchronize major compaction
- private AtomicBoolean isMajorCompactionActive = new AtomicBoolean(false);
- // the following integer tracks the max sequence number amongst the
- // target files being major compacted. This value will be used to prevent
- // concurrent MajorC and minorC. MinorC is preempted in case of an
- // overlap. This object is also used as a lock. The lock is acquired before
- // identifying compaction targets and before marking targets for expiry
- final AtomicInteger maxMajorCSeqNum = new AtomicInteger(-1);
-
- @Override
- public void suspend() {
- long wait = Long.getLong(HoplogConfig.SUSPEND_MAX_WAIT_MS, HoplogConfig.SUSPEND_MAX_WAIT_MS_DEFAULT);
- this.suspend=true;
- //this forces the compact method to finish.
- while (isMajorCompactionActive.get() || isMinorCompactionActive.get()) {
- if (wait < 0) {
- wait = Long.getLong(HoplogConfig.SUSPEND_MAX_WAIT_MS, HoplogConfig.SUSPEND_MAX_WAIT_MS_DEFAULT);
- String act = isMajorCompactionActive.get() ? "MajorC" : "MinorC";
- logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_SUSPEND_OF_0_FAILED_IN_1, new Object[] {act, wait}));
- break;
- }
- try {
- TimeUnit.MILLISECONDS.sleep(50);
- wait -= 50;
- } catch (InterruptedException e) {
- break;
- }
- }
- }
-
- @Override
- public void resume() {
- this.suspend = false;
- }
-
- @Override
- public boolean isBusy(boolean isMajor) {
- if (isMajor) {
- return isMajorCompactionActive.get();
- } else {
- return isMinorCompactionActive.get();
- }
- }
-
- /**
- * compacts hoplogs. The method takes a minor or major compaction "lock" to
- * prevent concurrent execution of compaction cycles. A possible improvement
- * is to allow parallel execution of minor compaction if the sets of
- * hoplogs being compacted are disjoint.
- */
- @Override
- public boolean compact(boolean isMajor, boolean isForced) throws IOException {
- if(suspend) {
- return false;
- }
-
- String extension = null;
- IOOperation compactionStats = null;
- long startTime = 0;
- final AtomicBoolean lock;
- Hoplog compactedHoplog = null;
- List<TrackedReference<Hoplog>> targets = null;
- String user = logger.isDebugEnabled() ? (isMajor ? "MajorC" : "MinorC") : null;
-
- if (isMajor) {
- lock = isMajorCompactionActive;
- extension = MAJOR_HOPLOG_EXTENSION;
- compactionStats = stats.getMajorCompaction();
- } else {
- lock = isMinorCompactionActive;
- extension = MINOR_HOPLOG_EXTENSION;
- compactionStats = stats.getMinorCompaction();
- }
-
- // final check before beginning compaction. Return if compaction is active
- if (! lock.compareAndSet(false, true)) {
- if (isMajor) {
- if (logger.isDebugEnabled())
- logger.debug("{}Major compaction already active. Ignoring new request", logPrefix);
- } else {
- if (logger.isDebugEnabled())
- logger.debug("Minor compaction already active. Ignoring new request", logPrefix);
- }
- return false;
- }
-
- try {
- if(suspend) {
- return false;
- }
-
- // variables for updating stats
- startTime = compactionStats.begin();
-
- int seqNum = -1;
- int lastKnownMajorCSeqNum;
- synchronized (maxMajorCSeqNum) {
- lastKnownMajorCSeqNum = maxMajorCSeqNum.get();
- targets = hoplogReadersController.getTrackedSortedOplogList(user);
- getCompactionTargets(isMajor, targets, lastKnownMajorCSeqNum);
- if (targets != null && targets.size() > 0) {
- targets = Collections.unmodifiableList(targets);
- seqNum = getSequenceNumber(targets.get(0).get());
- if (isMajor) {
- maxMajorCSeqNum.set(seqNum);
- }
- }
- }
-
- if (targets == null || targets.isEmpty() || (!isMajor && targets.size() == 1 && !isForced)) {
- if (logger.isDebugEnabled()){
- logger.debug("{}Skipping compaction, too few hoplops to compact. Major?" + isMajor, logPrefix);
- }
-
- compactionStats.end(0, startTime);
- return true;
- }
-
- //In case that we only have one major compacted file, we don't need to run major compaction to
- //generate a copy of the same content
- if (targets.size() == 1 && !isForced) {
- String hoplogName = targets.get(0).get().getFileName();
- if (hoplogName.endsWith(MAJOR_HOPLOG_EXTENSION)){
- if (logger.isDebugEnabled()){
- logger.debug("{}Skipping compaction, no need to compact a major compacted file. Major?" + isMajor, logPrefix);
- }
- compactionStats.end(0, startTime);
- return true;
- }
- }
-
- if (logger.isDebugEnabled()) {
- for (TrackedReference<Hoplog> target : targets) {
- if (logger.isDebugEnabled()) {
- fineLog("Target:", target, " size:", target.get().getSize());
- }
- }
- }
-
- // Create a temporary hoplog for compacted hoplog. The compacted hoplog
- // will have the seq number same as that of youngest target file. Any
- // hoplog younger than target hoplogs will have a higher sequence number
- compactedHoplog = getTmpSortedOplog(seqNum, extension);
-
- long byteCount;
- try {
- byteCount = fillCompactionHoplog(isMajor, targets, compactedHoplog, lastKnownMajorCSeqNum);
- compactionStats.end(byteCount, startTime);
- } catch (InterruptedException e) {
- if (logger.isDebugEnabled())
- logger.debug("{}Compaction execution suspended", logPrefix);
- compactionStats.error(startTime);
- return false;
- } catch (ForceReattemptException e) {
- if (logger.isDebugEnabled())
- logger.debug("{}Compaction execution suspended", logPrefix);
- compactionStats.error(startTime);
- return false;
- }
-
- // creation of compacted hoplog completed, its time to use it for
- // reading. Before using it, make sure minorC and mojorC were not
- // executing on overlapping sets of files. All targets can be marked for
- // expiration. Notify listener if configured. Update bucket size
- synchronized (maxMajorCSeqNum) {
- if (!isMajor && isMinorMajorOverlap(targets, maxMajorCSeqNum.get())) {
- // MajorC is higher priority. In case of any overlap kill minorC
- if (logger.isDebugEnabled())
- logger.debug("{}Interrupting MinorC for a concurrent MajorC", logPrefix);
- compactionStats.error(startTime);
- return false;
- }
- addSortedOplog(compactedHoplog, true, false);
- markSortedOplogForDeletion(targets, true);
- }
- } catch (IOException e) {
- compactionStats.error(startTime);
- throw e;
- } finally {
- if (isMajor) {
- maxMajorCSeqNum.set(-1);
- }
- lock.set(false);
- hoplogReadersController.releaseHoplogs(targets, user);
- }
-
- incrementDiskUsage(compactedHoplog.getSize());
- reEstimateBucketSize();
-
- notifyCompactionListeners(isMajor);
- return true;
- }
-
- /**
- * Major compaction compacts all files. Seq number of the youngest file
- * being MajorCed is known. If MinorC is operating on any file with a seq
- * number less than this number, there is a overlap
- * @param num
- */
- boolean isMinorMajorOverlap(List<TrackedReference<Hoplog>> targets, int num) {
- if (num < 0 || targets == null || targets.isEmpty()) {
- return false;
- }
-
- for (TrackedReference<Hoplog> hop : targets) {
- if (getSequenceNumber(hop.get()) <= num) {
- return true;
- }
- }
-
- return false;
- }
-
- /**
- * Iterates over targets and writes eligible targets to the output hoplog.
- * Handles creation of iterators and writer and closing it in case of
- * errors.
- */
- public long fillCompactionHoplog(boolean isMajor,
- List<TrackedReference<Hoplog>> targets, Hoplog output, int majorCSeqNum)
- throws IOException, InterruptedException, ForceReattemptException {
-
- HoplogWriter writer = null;
- ICardinality localHLL = new HyperLogLog(HLL_CONSTANT);
- HoplogSetIterator mergedIter = null;
- int byteCount = 0;
-
- try {
- // create a merged iterator over the targets and write entries into
- // output hoplog
- mergedIter = new HoplogSetIterator(targets);
- writer = output.createWriter(mergedIter.getRemainingEntryCount());
-
- boolean interrupted = false;
- for (; mergedIter.hasNext(); ) {
- if (suspend) {
- interrupted = true;
- break;
- } else if (!isMajor && maxMajorCSeqNum.get() > majorCSeqNum) {
- // A new major compaction cycle is starting, quit minorC to avoid
- // duplicate work and missing deletes
- if (logger.isDebugEnabled())
- logger.debug("{}Preempting MinorC, new MajorC cycle detected ", logPrefix);
- interrupted = true;
- break;
- }
-
- mergedIter.nextBB();
-
- ByteBuffer k = mergedIter.getKeyBB();
- ByteBuffer v = mergedIter.getValueBB();
-
- boolean isDeletedEntry = isDeletedEntry(v.array(), v.arrayOffset());
- if (isMajor && isDeletedEntry) {
- // its major compaction, time to ignore deleted entries
- continue;
- }
-
- if (!isDeletedEntry) {
- int hash = MurmurHash.hash(k.array(), k.arrayOffset(), k.remaining(), -1);
- localHLL.offerHashed(hash);
- }
-
- writer.append(k, v);
- byteCount += (k.remaining() + v.remaining());
- }
-
- mergedIter.close();
- mergedIter = null;
-
- writer.close(buildMetaData(localHLL));
- writer = null;
-
- if (interrupted) {
- // If we suspended compaction operations, delete the partially written
- // file and return.
- output.delete();
- throw new InterruptedException();
- }
-
- // ping secondaries before making the file a legitimate file to ensure
- // that in case of split brain, no other vm has taken up as primary. #50110.
- pingSecondaries();
-
- makeLegitimate(output);
- return byteCount;
- } catch (IOException e) {
- e = handleWriteHdfsIOError(writer, output, e);
- writer = null;
- throw e;
- } catch (ForceReattemptException e) {
- output.delete();
- throw e;
- }finally {
- if (mergedIter != null) {
- mergedIter.close();
- }
-
- if (writer != null) {
- writer.close();
- }
- }
- }
-
- /**
- * identifies targets. For major compaction all sorted oplogs will be
- * iterated on. For minor compaction, policy driven fewer targets will take
- * place.
- */
- protected void getCompactionTargets(boolean major,
- List<TrackedReference<Hoplog>> targets, int majorCSeqNum) {
- if (!major) {
- getMinorCompactionTargets(targets, majorCSeqNum);
- }
- }
-
- /**
- * list of oplogs most suitable for compaction. The alogrithm selects m
- * smallest oplogs which are not bigger than X in size. Null if valid
- * candidates are not found
- */
- void getMinorCompactionTargets(List<TrackedReference<Hoplog>> targets, int majorCSeqNum)
- {
- List<TrackedReference<Hoplog>> omittedHoplogs = new ArrayList<TrackedReference<Hoplog>>();
-
- // reverse the order of hoplogs in list. the oldest file becomes the first file.
- Collections.reverse(targets);
-
- // hoplog greater than this size will not be minor-compacted
- final long MAX_COMPACTION_FILE_SIZE;
- // maximum number of files to be included in any compaction cycle
- final int MAX_FILE_COUNT_COMPACTION;
- // minimum number of files that must be present for compaction to be worth
- final int MIN_FILE_COUNT_COMPACTION;
-
- MAX_COMPACTION_FILE_SIZE = ((long)store.getInputFileSizeMax()) * 1024 *1024;
- MAX_FILE_COUNT_COMPACTION = store.getInputFileCountMax();
- MIN_FILE_COUNT_COMPACTION = store.getInputFileCountMin();
-
- try {
- // skip till first file smaller than the max compaction file size is
- // found. And if MajorC is active, move to a file which is also outside
- // scope of MajorC
- for (Iterator<TrackedReference<Hoplog>> iterator = targets.iterator(); iterator.hasNext();) {
- TrackedReference<Hoplog> oplog = iterator.next();
- if (majorCSeqNum >= getSequenceNumber(oplog.get())) {
- iterator.remove();
- omittedHoplogs.add(oplog);
- if (logger.isDebugEnabled()){
- fineLog("Overlap with MajorC, excluding hoplog " + oplog.get());
- }
- continue;
- }
-
- if (oplog.get().getSize() > MAX_COMPACTION_FILE_SIZE || oplog.get().getFileName().endsWith(MAJOR_HOPLOG_EXTENSION)) {
- // big file will not be included for minor compaction
- // major compacted file will not be converted to minor compacted file
- iterator.remove();
- omittedHoplogs.add(oplog);
- if (logger.isDebugEnabled()) {
- fineLog("Excluding big hoplog from minor cycle:",
- oplog.get(), " size:", oplog.get().getSize(), " limit:",
- MAX_COMPACTION_FILE_SIZE);
- }
- } else {
- // first small hoplog found, skip the loop
- break;
- }
- }
-
- // If there are too few files no need to perform compaction
- if (targets.size() < MIN_FILE_COUNT_COMPACTION) {
- if (logger.isDebugEnabled()){
- logger.debug("{}Too few hoplogs for minor cycle:" + targets.size(), logPrefix);
- }
- omittedHoplogs.addAll(targets);
- targets.clear();
- return;
- }
-
- float maxGain = Float.MIN_VALUE;
- int bestFrom = -1;
- int bestTo = -1;
-
- // for listSize=5 list, minFile=3; maxIndex=5-3.
- // so from takes values 0,1,2
- int maxIndexForFrom = targets.size() - MIN_FILE_COUNT_COMPACTION;
- for (int from = 0; from <= maxIndexForFrom ; from++) {
- // for listSize=6 list, minFile=3, maxFile=5; minTo=0+3-1, maxTo=0+5-1
- // so to takes values 2,3,4
- int minIndexForTo = from + MIN_FILE_COUNT_COMPACTION - 1;
- int maxIndexForTo = Math.min(from + MAX_FILE_COUNT_COMPACTION, targets.size());
-
- for (int i = minIndexForTo; i < maxIndexForTo; i++) {
- Float gain = computeGain(from, i, targets);
- if (gain == null) {
- continue;
- }
-
- if (gain > maxGain) {
- maxGain = gain;
- bestFrom = from;
- bestTo = i;
- }
- }
- }
-
- if (bestFrom == -1) {
- if (logger.isDebugEnabled())
- logger.debug("{}Failed to find optimal target set for MinorC", logPrefix);
- omittedHoplogs.addAll(targets);
- targets.clear();
- return;
- }
-
- if (logger.isDebugEnabled()) {
- fineLog("MinorCTarget optimal result from:", bestFrom, " to:", bestTo);
- }
-
- // remove hoplogs they do not fall in the bestFrom-bestTo range
- int i = 0;
- for (Iterator<TrackedReference<Hoplog>> iter = targets.iterator(); iter.hasNext();) {
- TrackedReference<Hoplog> hop = iter.next();
- if (i < bestFrom || i > bestTo) {
- iter.remove();
- omittedHoplogs.add(hop);
- }
- i++;
- }
- } finally {
- // release readers of targets not included in the compaction cycle
- String user = logger.isDebugEnabled() ? "MinorC" : null;
- hoplogReadersController.releaseHoplogs(omittedHoplogs, user);
- }
-
- // restore the order, youngest file is the first file again
- Collections.reverse(targets);
- }
-
- @Override
- public HDFSStore getHdfsStore() {
- return store;
- }
- }
-
- Float computeGain(int from, int to, List<TrackedReference<Hoplog>> targets) {
- double SIZE_64K = 64.0 * 1024;
- // TODO the base for log should depend on the average number of keys a index block will contain
- double LOG_BASE = Math.log(AVG_NUM_KEYS_PER_INDEX_BLOCK);
-
- long totalSize = 0;
- double costBefore = 0f;
- for (int i = from; i <= to; i++) {
- long size = targets.get(i).get().getSize();
- if (size == 0) {
- continue;
- }
- totalSize += size;
-
- // For each hoplog file, read cost is number of index block reads and 1
- // data block read. Index blocks on an average contain N keys and are
- // organized in a N-ary tree structure. Hence the number of index block
- // reads will be logBaseN(number of data blocks)
- costBefore += Math.ceil(Math.max(1.0, Math.log(size / SIZE_64K) / LOG_BASE)) + 1;
- }
-
- // if the first file is relatively too large this set is bad for compaction
- long firstFileSize = targets.get(from).get().getSize();
- if (firstFileSize > (totalSize - firstFileSize) * RATIO) {
- if (logger.isDebugEnabled()){
- fineLog("First file too big:", firstFileSize, " totalSize:", totalSize);
- }
- return null;
- }
-
- // compute size in mb so that the value of gain is in few decimals
- long totalSizeInMb = totalSize / 1024 / 1024;
- if (totalSizeInMb == 0) {
- // the files are tooooo small, just return the count. The more we compact
- // the better it is
- if (logger.isDebugEnabled()) {
- logger.debug("{}Total size too small:" +totalSize, logPrefix);
- }
- return (float) costBefore;
- }
-
- double costAfter = Math.ceil(Math.log(totalSize / SIZE_64K) / LOG_BASE) + 1;
- return (float) ((costBefore - costAfter) / totalSizeInMb);
- }
-
- /**
- * Hoplog readers are accessed asynchronously. There could be a window in
- * which, while a hoplog is being iterated on, it gets compacted and becomes
- * expired or inactive. The reader of the hoplog must not be closed till the
- * iterator completes. All such scenarios will be managed by this class. It
- * will keep all the reader, active and inactive, and reference counter to the
- * readers. An inactive reader will be closed if the reference count goes down
- * to 0.
- *
- * One important point, only compaction process makes a hoplog inactive.
- * Compaction process in a bucket is single threaded. So compaction itself
- * will not face race condition. Read and scan operations on the bucket will
- * be affected. So reference counter is incremented for each read and scan.
- *
- */
- private class HoplogReadersController implements HoplogReaderActivityListener {
- private Integer maxOpenFilesLimit;
-
- // sorted collection of all the active oplog files associated with this bucket. Instead of a
- // queue, a set is used. New files created as part of compaction may be inserted after a few
- // hoplogs were created. The compacted file is such a case but should not be treated newest.
- private final ConcurrentSkipListSet<TrackedReference<Hoplog>> hoplogs;
-
- // list of all the hoplogs that have been compacted and need to be closed
- // once the reference count reduces to 0
- private final ConcurrentHashSet<TrackedReference<Hoplog>> inactiveHoplogs;
-
- // ReadWriteLock on list of oplogs to allow for consistent reads and scans
- // while hoplog set changes. A write lock is needed on completion of
- // compaction, addition of a new hoplog or on receiving updates message from
- // other GF nodes
- private final ReadWriteLock hoplogRWLock = new ReentrantReadWriteLock(true);
-
- // tracks the number of active readers for hoplogs of this bucket
- private AtomicInteger activeReaderCount = new AtomicInteger(0);
-
- public HoplogReadersController() {
- HoplogComparator comp = new HoplogComparator();
- hoplogs = new ConcurrentSkipListSet<TrackedReference<Hoplog>>(comp) {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean add(TrackedReference<Hoplog> e) {
- // increment number of hoplogs active for this bucket
- boolean result = super.add(e);
- if (result)
- stats.incActiveFiles(1);
- return result;
- }
-
- @Override
- public boolean remove(Object o) {
- // decrement the number of hoplogs active for this bucket
- boolean result = super.remove(o);
- if (result)
- stats.incActiveFiles(-1);
- return result;
- }
- };
-
- inactiveHoplogs = new ConcurrentHashSet<TrackedReference<Hoplog>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean add(TrackedReference<Hoplog> e) {
- boolean result = super.add(e);
- if (result)
- stats.incInactiveFiles(1);
- return result;
- }
-
- @Override
- public boolean remove(Object o) {
- boolean result = super.remove(o);
- if (result)
- stats.incInactiveFiles(-1);
- return result;
- }
- };
-
- maxOpenFilesLimit = Integer.getInteger(
- HoplogConfig.BUCKET_MAX_OPEN_HFILES_CONF,
- HoplogConfig.BUCKET_MAX_OPEN_HFILES_DEFAULT);
- }
-
- Hoplog getOldestHoplog() {
- if (hoplogs.isEmpty()) {
- return null;
- }
- return hoplogs.last().get();
- }
-
- /**
- * locks sorted oplogs collection and performs add operation
- * @return if addition was successful
- */
- private boolean addSortedOplog(Hoplog so) throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Try add " + so, logPrefix);
- }
- hoplogRWLock.writeLock().lock();
- try {
- int size = hoplogs.size();
- boolean result = hoplogs.add(new TrackedReference<Hoplog>(so));
- so.setReaderActivityListener(this);
- if (logger.isDebugEnabled()){
- fineLog("Added: ", so, " Before:", size, " After:", hoplogs.size());
- }
- return result;
- } finally {
- hoplogRWLock.writeLock().unlock();
- }
- }
-
- /**
- * locks sorted oplogs collection and performs remove operation and updates readers also
- */
- private void removeSortedOplog(TrackedReference<Hoplog> so) throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("Try remove " + so, logPrefix);
- }
- hoplogRWLock.writeLock().lock();
- try {
- int size = hoplogs.size();
- boolean result = hoplogs.remove(so);
- if (result) {
- inactiveHoplogs.add(so);
- if (logger.isDebugEnabled()) {
- fineLog("Removed: ", so, " Before:", size, " After:", hoplogs.size());
- }
- } else {
- if (inactiveHoplogs.contains(so)) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Found a missing active hoplog in inactive list." + so, logPrefix);
- }
- } else {
- so.get().close();
- logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_MISSING_IN_BUCKET_FORCED_CLOSED, so.get()));
- }
- }
- } finally {
- hoplogRWLock.writeLock().unlock();
- }
- }
-
- private void closeInactiveHoplogs() throws IOException {
- hoplogRWLock.writeLock().lock();
- try {
- for (TrackedReference<Hoplog> hoplog : inactiveHoplogs) {
- if (logger.isDebugEnabled()){
- logger.debug("{}Try close inactive " + hoplog, logPrefix);
- }
-
- if (!hoplog.inUse()) {
- int size = inactiveHoplogs.size();
- inactiveHoplogs.remove(hoplog);
- closeReaderAndSuppressError(hoplog.get(), true);
- if (logger.isDebugEnabled()){
- fineLog("Closed inactive: ", hoplog.get(), " Before:", size,
- " After:", inactiveHoplogs.size());
- }
- }
- }
- } finally {
- hoplogRWLock.writeLock().unlock();
- }
- }
-
- /**
- * @param target
- * name of the hoplog file
- * @return trackedReference if target exists in inactive hoplog list.
- * @throws IOException
- */
- TrackedReference<Hoplog> getInactiveHoplog(String target) throws IOException {
- hoplogRWLock.writeLock().lock();
- try {
- for (TrackedReference<Hoplog> hoplog : inactiveHoplogs) {
- if (hoplog.get().getFileName().equals(target)) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Target found in inactive hoplogs list: " + hoplog, logPrefix);
- }
- return hoplog;
- }
- }
- if (logger.isDebugEnabled()){
- logger.debug("{}Target not found in inactive hoplogs list: " + target, logPrefix);
- }
- return null;
- } finally {
- hoplogRWLock.writeLock().unlock();
- }
- }
-
- /**
- * force closes all readers
- */
- public void close() throws IOException {
- hoplogRWLock.writeLock().lock();
- try {
- for (TrackedReference<Hoplog> hoplog : hoplogs) {
- closeReaderAndSuppressError(hoplog.get(), true);
- }
-
- for (TrackedReference<Hoplog> hoplog : inactiveHoplogs) {
- closeReaderAndSuppressError(hoplog.get(), true);
- }
- } finally {
- hoplogs.clear();
- inactiveHoplogs.clear();
- hoplogRWLock.writeLock().unlock();
- }
- }
-
- /**
- * locks hoplogs to create a snapshot of active hoplogs. reference of each
- * reader is incremented to keep it from getting closed
- *
- * @return ordered list of sorted oplogs
- */
- private List<TrackedReference<Hoplog>> getTrackedSortedOplogList(String user) {
- List<TrackedReference<Hoplog>> oplogs = new ArrayList<TrackedReference<Hoplog>>();
- hoplogRWLock.readLock().lock();
- try {
- for (TrackedReference<Hoplog> oplog : hoplogs) {
- oplog.increment(user);
- oplogs.add(oplog);
- if (logger.isDebugEnabled()) {
- logger.debug("{}Track ref " + oplog, logPrefix);
- }
- }
- } finally {
- hoplogRWLock.readLock().unlock();
- }
- return oplogs;
- }
-
- private TrackedReference<Hoplog> trackHoplog(Hoplog hoplog, String user) {
- hoplogRWLock.readLock().lock();
- try {
- for (TrackedReference<Hoplog> oplog : hoplogs) {
- if (oplog.get().getFileName().equals(hoplog.getFileName())) {
- oplog.increment(user);
- if (logger.isDebugEnabled()) {
- logger.debug("{}Track " + oplog, logPrefix);
- }
- return oplog;
- }
- }
- } finally {
- hoplogRWLock.readLock().unlock();
- }
- throw new NoSuchElementException(hoplog.getFileName());
- }
-
- public void releaseHoplogs(List<TrackedReference<Hoplog>> targets, String user) {
- if (targets == null) {
- return;
- }
-
- for (int i = targets.size() - 1; i >= 0; i--) {
- TrackedReference<Hoplog> hoplog = targets.get(i);
- releaseHoplog(hoplog, user);
- }
- }
-
- public void releaseHoplog(TrackedReference<Hoplog> target, String user) {
- if (target == null) {
- return;
- }
-
- target.decrement(user);
- if (logger.isDebugEnabled()) {
- logger.debug("{}Try release " + target, logPrefix);
- }
- if (target.inUse()) {
- return;
- }
-
- // there are no users of this hoplog. if it is inactive close it.
- hoplogRWLock.writeLock().lock();
- try {
- if (!target.inUse()) {
- if (inactiveHoplogs.contains(target) ) {
- int sizeBefore = inactiveHoplogs.size();
- inactiveHoplogs.remove(target);
- closeReaderAndSuppressError(target.get(), true);
- if (logger.isDebugEnabled()) {
- fineLog("Closed inactive: ", target, " totalBefore:", sizeBefore,
- " totalAfter:", inactiveHoplogs.size());
- }
- } else if (hoplogs.contains(target)) {
- closeExcessReaders();
- }
- }
- } catch (IOException e) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_IO_ERROR,
- "Close reader: " + target.get().getFileName()), e);
- } finally {
- hoplogRWLock.writeLock().unlock();
- }
- }
-
- /*
- * detects if the total number of open hdfs readers is more than configured
- * max file limit. In case the limit is exceeded, some readers need to be
- * closed to avoid dadanode receiver overflow error.
- */
- private void closeExcessReaders() throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Close excess readers. Size:" + hoplogs.size()
- + " activeReaders:" + activeReaderCount.get() + " limit:"
- + maxOpenFilesLimit, logPrefix);
- }
-
- if (hoplogs.size() <= maxOpenFilesLimit) {
- return;
- }
-
- if (activeReaderCount.get() <= maxOpenFilesLimit) {
- return;
- }
-
- for (TrackedReference<Hoplog> hoplog : hoplogs.descendingSet()) {
- if (!hoplog.inUse() && !hoplog.get().isClosed()) {
- hoplog.get().close(false);
- if (logger.isDebugEnabled()) {
- logger.debug("{}Excess reader closed " + hoplog, logPrefix);
- }
- }
-
- if (activeReaderCount.get() <= maxOpenFilesLimit) {
- return;
- }
- }
- }
-
- @Override
- public void readerCreated() {
- activeReaderCount.incrementAndGet();
- stats.incActiveReaders(1);
- if (logger.isDebugEnabled())
- logger.debug("{}ActiveReader++", logPrefix);
- }
-
- @Override
- public void readerClosed() {
- activeReaderCount.decrementAndGet();
- stats.incActiveReaders(-1);
- if (logger.isDebugEnabled())
- logger.debug("{}ActiveReader--", logPrefix);
- }
- }
-
- /**
- * returns an ordered list of oplogs, FOR TESTING ONLY
- */
- public List<TrackedReference<Hoplog>> getSortedOplogs() throws IOException {
- List<TrackedReference<Hoplog>> oplogs = new ArrayList<TrackedReference<Hoplog>>();
- for (TrackedReference<Hoplog> oplog : hoplogReadersController.hoplogs) {
- oplogs.add(oplog);
- }
- return oplogs;
- }
-
- /**
- * Merged iterator on a list of hoplogs.
- */
- public class BucketIterator implements HoplogIterator<byte[], SortedHoplogPersistedEvent> {
- // list of hoplogs to be iterated on.
- final List<TrackedReference<Hoplog>> hoplogList;
- HoplogSetIterator mergedIter;
-
- public BucketIterator(List<TrackedReference<Hoplog>> hoplogs) throws IOException {
- this.hoplogList = hoplogs;
- try {
- mergedIter = new HoplogSetIterator(this.hoplogList);
- if (logger.isDebugEnabled()) {
- for (TrackedReference<Hoplog> hoplog : hoplogs) {
- logger.debug("{}BucketIter target hop:" + hoplog.get().getFileName(), logPrefix);
- }
- }
- } catch (IllegalArgumentException e) {
- if (IOException.class.isAssignableFrom(e.getCause().getClass())) {
- throw handleIOError((IOException) e.getCause());
- } else {
- throw e;
- }
- } catch (IOException e) {
- throw handleIOError(e);
- } catch (HDFSIOException e) {
- throw handleIOError(e);
- }
- }
-
- @Override
- public boolean hasNext() {
- return mergedIter.hasNext();
- }
-
- @Override
- public byte[] next() throws IOException {
- try {
- return HFileSortedOplog.byteBufferToArray(mergedIter.next());
- } catch (IllegalArgumentException e) {
- if (IOException.class.isAssignableFrom(e.getCause().getClass())) {
- throw handleIOError((IOException) e.getCause());
- } else {
- throw e;
- }
- } catch (IOException e) {
- throw handleIOError(e);
- }
- }
-
- @Override
- public byte[] getKey() {
- // merged iterator returns a byte[]. This needs to be deserialized to the object which was
- // provided during flush operation
- return HFileSortedOplog.byteBufferToArray(mergedIter.getKey());
- }
-
- @Override
- public SortedHoplogPersistedEvent getValue() {
- // merged iterator returns a byte[]. This needs to be deserialized to the
- // object which was provided during flush operation
- try {
- return deserializeValue(HFileSortedOplog.byteBufferToArray(mergedIter.getValue()));
- } catch (IOException e) {
- throw new HDFSIOException("Failed to deserialize byte while iterating on partition", e);
- }
- }
-
- @Override
- public void remove() {
- mergedIter.remove();
- }
-
- @Override
- public void close() {
- // TODO release the closed iterators early
- String user = logger.isDebugEnabled() ? "Scan" : null;
- hoplogReadersController.releaseHoplogs(hoplogList, user);
- }
- }
-
- /**
- * This utility class is used to filter temporary hoplogs in a bucket
- * directory
- *
- */
- private static class TmpFilePathFilter implements PathFilter {
- @Override
- public boolean accept(Path path) {
- Matcher matcher = HOPLOG_NAME_PATTERN.matcher(path.getName());
- if (matcher.matches() && path.getName().endsWith(TEMP_HOPLOG_EXTENSION)) {
- return true;
- }
- return false;
- }
- }
-
- private void fineLog(Object... strings) {
- if (logger.isDebugEnabled()) {
- StringBuffer sb = concatString(strings);
- logger.debug(logPrefix + sb.toString());
- }
- }
-
- private StringBuffer concatString(Object... strings) {
- StringBuffer sb = new StringBuffer();
- for (Object str : strings) {
- sb.append(str.toString());
- }
- return sb;
- }
-
- @Override
- public void compactionCompleted(String region, int bucket, boolean isMajor) {
- // do nothing for compaction events. Hoplog Organizer depends on addition
- // and deletion of hoplogs only
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/Hoplog.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/Hoplog.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/Hoplog.java
deleted file mode 100644
index e622749..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/Hoplog.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import com.gemstone.gemfire.internal.hll.ICardinality;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.EnumMap;
-
-
-/**
- * Ordered sequence file
- */
-public interface Hoplog extends Closeable, Comparable<Hoplog> {
- public static final boolean NOP_WRITE = Boolean.getBoolean("Hoplog.NOP_WRITE");
-
- /** the gemfire magic number for sorted oplogs */
- public static final byte[] MAGIC = new byte[] { 0x47, 0x53, 0x4F, 0x50 };
-
- /**
- * @return an instance of cached reader, creates one if does not exist
- * @throws IOException
- */
- HoplogReader getReader() throws IOException;
-
- /**
- * Creates a new sorted writer.
- *
- * @param keys
- * an estimate of the number of keys to be written
- * @return the writer
- * @throws IOException
- * error creating writer
- */
- HoplogWriter createWriter(int keys) throws IOException;
-
- /**
- * @param listener listener of reader's activity
- */
- void setReaderActivityListener(HoplogReaderActivityListener listener);
-
- /**
- * @return file name
- */
- String getFileName();
-
- /**
- * @return Entry count estimate for this hoplog
- */
- public ICardinality getEntryCountEstimate() throws IOException;
-
- /**
- * renames the file to the input name
- *
- * @throws IOException
- */
- void rename(String name) throws IOException;
-
- /**
- * Deletes the sorted oplog file
- */
- void delete() throws IOException;
-
- /**
- * Returns true if the hoplog is closed for reads.
- * @return true if closed
- */
- boolean isClosed();
-
- /**
- * @param clearCache clear this sorted oplog's cache if true
- * @throws IOException
- */
- void close(boolean clearCache) throws IOException;
-
- /**
- * @return the modification timestamp of the file
- */
- long getModificationTimeStamp();
-
- /**
- * @return the size of file
- */
- long getSize();
-
- /**
- * Reads sorted oplog file.
- */
- public interface HoplogReader extends HoplogSetReader<byte[], byte[]> {
- /**
- * Returns a byte buffer based view of the value linked to the key
- */
- ByteBuffer get(byte[] key) throws IOException;
-
- /**
- * @return Returns the bloom filter associated with this sorted oplog file.
- */
- BloomFilter getBloomFilter() throws IOException;
-
- /**
- * @return number of KV pairs in the file, including tombstone entries
- */
- long getEntryCount();
-
- /**
- * Returns the {@link ICardinality} implementation that is useful for
- * estimating the size of this Hoplog.
- *
- * @return the cardinality estimator
- */
- ICardinality getCardinalityEstimator();
- }
-
- /**
- * Provides hoplog's reader's activity related events to owners
- *
- */
- public interface HoplogReaderActivityListener {
- /**
- * Invoked when a reader is created and an active reader did not exist
- * earlier
- */
- public void readerCreated();
-
- /**
- * Invoked when an active reader is closed
- */
- public void readerClosed();
- }
-
- /**
- * Writes key/value pairs in a sorted oplog file. Each entry that is appended must have a key that
- * is greater than or equal to the previous key.
- */
- public interface HoplogWriter extends Closeable {
- /**
- * Appends another key and value. The key is expected to be greater than or equal to the last
- * key that was appended.
- * @param key
- * @param value
- */
- void append(byte[] key, byte[] value) throws IOException;
-
- /**
- * Appends another key and value. The key is expected to be greater than or equal to the last
- * key that was appended.
- */
- void append(ByteBuffer key, ByteBuffer value) throws IOException;
-
- void close(EnumMap<Meta, byte[]> metadata) throws IOException;
-
- /**
- * flushes all outstanding data into the OS buffers on all DN replicas
- * @throws IOException
- */
- void hsync() throws IOException;
-
- /**
- * Gets the size of the data that has already been written
- * to the writer.
- *
- * @return number of bytes already written to the writer
- */
- public long getCurrentSize() throws IOException;
- }
-
- /**
- * Identifies the gemfire sorted oplog versions.
- */
- public enum HoplogVersion {
- V1;
-
- /**
- * Returns the version string as bytes.
- *
- * @return the byte form
- */
- public byte[] toBytes() {
- return name().getBytes();
- }
-
- /**
- * Constructs the version from a byte array.
- *
- * @param version
- * the byte form of the version
- * @return the version enum
- */
- public static HoplogVersion fromBytes(byte[] version) {
- return HoplogVersion.valueOf(new String(version));
- }
- }
-
- /**
- * Names the available metadata keys that will be stored in the sorted oplog.
- */
- public enum Meta {
- /** identifies the soplog as a gemfire file, required */
- GEMFIRE_MAGIC,
-
- /** identifies the soplog version, required */
- SORTED_OPLOG_VERSION,
-
- /** identifies the gemfire version the soplog was created with */
- GEMFIRE_VERSION,
-
- /** identifies the statistics data */
- STATISTICS,
-
- /** identifies the embedded comparator types */
- COMPARATORS,
-
- /** identifies the pdx type data, optional */
- PDX,
-
- /**
- * identifies the hyperLogLog byte[] which estimates the cardinality for
- * only one hoplog
- */
- LOCAL_CARDINALITY_ESTIMATE,
-
- /**
- * represents the hyperLogLog byte[] after upgrading the constant from
- * 0.1 to 0.03 (in gfxd 1.4)
- */
- LOCAL_CARDINALITY_ESTIMATE_V2
- ;
-
- /**
- * Converts the metadata name to bytes.
- */
- public byte[] toBytes() {
- return ("gemfire." + name()).getBytes();
- }
-
- /**
- * Converts the byte form of the name to an enum.
- *
- * @param key
- * the key as bytes
- * @return the enum form
- */
- public static Meta fromBytes(byte[] key) {
- return Meta.valueOf(new String(key).substring("gemfire.".length()));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogConfig.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogConfig.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogConfig.java
deleted file mode 100644
index 7b8415e..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogConfig.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-
-/**
- * This interface defines all the hoplog configuration related constants. One
- * location simplifies searching for a constant
- *
- */
-public interface HoplogConfig {
- // max number of open files per bucket. by default each region has 113
- // buckets. A typical hdfs deployment has 5 DN each allowing 4096 open
- // files. The intent is to use around 40 % of these and hence the default
- // value is 72
- public static final String BUCKET_MAX_OPEN_HFILES_CONF = "hoplog.bucket.max.open.files";
- public final Integer BUCKET_MAX_OPEN_HFILES_DEFAULT = 72;
-
- public static final String HFILE_BLOCK_SIZE_CONF = "hoplog.hfile.block.size";
-
- // Region maintenance activity interval. default is 2 mins
- public static final String JANITOR_INTERVAL_SECS = "hoplog.janitor.interval.secs";
- public static final long JANITOR_INTERVAL_SECS_DEFAULT = 120l;
-
- // Maximum number of milliseconds to wait for suspension action to complete
- public static final String SUSPEND_MAX_WAIT_MS = "hoplog.suspend.max.wait.ms";
- public static final long SUSPEND_MAX_WAIT_MS_DEFAULT = 1000l;
-
- // Compaction request queue limit configuraiton
- public static final String COMPCATION_QUEUE_CAPACITY = "hoplog.compaction.queue.capacity";
- public static final int COMPCATION_QUEUE_CAPACITY_DEFAULT = 500;
-
- // Compaction request queue limit configuraiton
- public static final String COMPACTION_FILE_RATIO = "hoplog.compaction.file.ratio";
- public static final float COMPACTION_FILE_RATIO_DEFAULT = 1.3f;
-
- //Amount of time before deleting old temporary files
- public static final String TMP_FILE_EXPIRATION = "hoplog.tmp.file.expiration.ms";
- public static final long TMP_FILE_EXPIRATION_DEFAULT = 10 * 60 * 1000;
-
- // If this property is set as true, GF will let DFS client cache FS objects
- public static final String USE_FS_CACHE = "hoplog.use.fs.cache";
-
- // If set hdfs store will be able to connect to local file System
- public static final String ALLOW_LOCAL_HDFS_PROP = "hoplog.ALLOW_LOCAL_HDFS";
-
- // The following constants are used to read kerberos authentication related
- // configuration. Currently these configurations are provided as client config
- // file while hdfs store is created
- public static final String KERBEROS_PRINCIPAL = "gemfirexd.kerberos.principal";
- public static final String KERBEROS_KEYTAB_FILE= "gemfirexd.kerberos.keytab.file";
- public static final String PERFORM_SECURE_HDFS_CHECK_PROP = "gemfire.PERFORM_SECURE_HDFS_CHECK";
-
- // clean up interval file that exposed to MapReduce jobs
- public static final String CLEAN_UP_INTERVAL_FILE_NAME = "cleanUpInterval";
- // Compression settings
- public static final String COMPRESSION = "hoplog.compression.algorithm";
- public static final String COMPRESSION_DEFAULT = "NONE";
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogListener.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogListener.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogListener.java
deleted file mode 100644
index 7c3de03..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogListener.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.IOException;
-
-/**
- * Defines an observer of asynchronous operations on sorted oplog files associated with a bucket.
- */
-public interface HoplogListener {
- /**
- * Notifies creation of new sorted oplog files. A new file will be created after compaction or
- * other bucket maintenance activities
- *
- * @throws IOException
- */
- void hoplogCreated(String regionFolder, int bucketId, Hoplog... oplogs) throws IOException;
-
- /**
- * Notifies file deletion. A file becomes redundant after compaction or other bucket maintenance
- * activities
- * @throws IOException
- */
- void hoplogDeleted(String regionFolder, int bucketId, Hoplog... oplogs) throws IOException;
-
- /**
- * Notifies completion of a hoplog compaction cycle.
- * @param region Region on which compaction was performed
- * @param bucket bucket id
- * @param isMajor true if major compaction was executed
- */
- void compactionCompleted(String region, int bucket, boolean isMajor);
-}