You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 20:28:01 UTC
[09/16] incubator-geode git commit: GEODE-1072: Removing HDFS related
code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogOrganizer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogOrganizer.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogOrganizer.java
deleted file mode 100644
index f7d746d..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogOrganizer.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.concurrent.Future;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
-import com.gemstone.gemfire.internal.cache.ForceReattemptException;
-
-/**
- * Manages bucket level operations on sorted oplog files including creation, reading, serde, bloom
- * buffering and compaction. Abstracts existence of multiple sorted oplog files
- */
-public interface HoplogOrganizer<T extends PersistedEventImpl> extends HoplogSetReader<byte[], T>,
- HoplogListener, Closeable {
-
- /**
- * Iterates on the input buffer and persists it in a new sorted oplog. This invocation may block
- * if there are too many outstanding write requests.
- *
- * @param bufferIter
- * ordered iterator on a buffer of objects to be persisted
- * @param count
- * number of K,V pairs expected to be part of flush, 0 if unknown
- * @throws IOException
- */
- public void flush(Iterator<? extends QueuedPersistentEvent> bufferIter, int count)
- throws IOException, ForceReattemptException;
-
-
- /**
- * Clear the data in HDFS. This method assumes that the
- * dispatcher thread has already been paused, so there should be
- * no concurrent flushes to HDFS when this method is called.
- *
- * @throws IOException
- */
- public void clear() throws IOException;
-
- /**
- * returns the compactor associated with this set
- */
- public Compactor getCompactor();
-
- /**
- * Called to execute bucket maintenance activities, like purge expired files
- * and create compaction task. Long running activities must be executed
- * asynchronously, not on this thread, to avoid impact on other buckets
- * @throws IOException
- */
- public void performMaintenance() throws IOException;
-
- /**
- * Schedules a compaction task and returns immediately.
- *
- * @param isMajor true for major compaction, false for minor compaction
- * @return future for status of compaction request
- */
- public Future<CompactionStatus> forceCompaction(boolean isMajor);
-
- /**
- * Returns the timestamp of the last completed major compaction
- *
- * @return the timestamp or 0 if a major compaction has not taken place yet
- */
- public long getLastMajorCompactionTimestamp();
-
- public interface Compactor {
- /**
- * Requests a compaction operation be performed on this set of sorted oplogs.
- *
- * @param isMajor true for major compaction
- * @param isForced true if the compaction should be carried out even if there
- * is only one hoplog to compact
- *
- * @return true if compaction was performed, false otherwise
- * @throws IOException
- */
- boolean compact(boolean isMajor, boolean isForced) throws IOException;
-
- /**
- * Stop the current compaction operation in the middle and suspend
- * compaction operations. The current current compaction data
- * will be thrown away, and no more compaction will be performend
- * until resume is called.
- */
- void suspend();
-
- /**
- * Resume compaction operations.
- */
- void resume();
-
- /**
- * @return true if the compactor is not ready or busy
- */
- boolean isBusy(boolean isMajor);
-
- /**
- * @return the hdfsStore configuration used by this compactor
- */
- public HDFSStore getHdfsStore();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetIterator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetIterator.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetIterator.java
deleted file mode 100644
index 16939db..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetIterator.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HFileSortedOplog.HFileReader.HFileSortedIterator;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.ByteComparator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
-
-/**
- * Provides a merged iterator on set of {@link HFileSortedOplog}
- */
-public class HoplogSetIterator implements HoplogIterator<ByteBuffer, ByteBuffer> {
- private final List<HFileSortedIterator> iters;
-
- // Number of entries remaining to be iterated by this scanner
- private int entriesRemaining;
-
- // points at the current iterator holding the next entry
- private ByteBuffer currentKey;
- private ByteBuffer currentValue;
-
- public HoplogSetIterator(List<TrackedReference<Hoplog>> targets) throws IOException {
- iters = new ArrayList<HFileSortedIterator>();
- for (TrackedReference<Hoplog> oplog : targets) {
- HFileSortedIterator iter = (HFileSortedIterator) oplog.get().getReader().scan();
- if (!iter.hasNext()) {
- // the oplog is empty, exclude from iterator
- continue;
- }
-
- // initialize the iterator
- iter.nextBB();
- iters.add(iter);
- entriesRemaining += oplog.get().getReader().getEntryCount();
- }
- }
-
- public boolean hasNext() {
- return entriesRemaining > 0;
- }
-
- @Override
- public ByteBuffer next() throws IOException {
- return nextBB();
- }
- public ByteBuffer nextBB() throws IOException {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
-
- seekToMinKeyIter();
-
- return currentKey;
- }
-
- private void seekToMinKeyIter() throws IOException {
- HFileSortedIterator currentIter = null;
- ByteBuffer minKey = null;
-
- // scan through all hoplog iterators to reach to the iterator with smallest
- // key on the head and remove duplicate keys
- for (Iterator<HFileSortedIterator> iterator = iters.iterator(); iterator.hasNext();) {
- HFileSortedIterator iter = iterator.next();
-
- ByteBuffer tmpK = iter.getKeyBB();
- ByteBuffer tmpV = iter.getValueBB();
- if (minKey == null || ByteComparator.compareBytes(tmpK.array(), tmpK.arrayOffset(), tmpK.remaining(), minKey.array(), minKey.arrayOffset(), minKey.remaining()) < 0) {
- minKey = tmpK;
- currentKey = tmpK;
- currentValue = tmpV;
- currentIter = iter;
- } else {
- // remove possible duplicate key entries from iterator
- if (seekHigherKeyInIter(minKey, iter) == null) {
- // no more keys left in this iterator
- iter.close();
- iterator.remove();
- }
- }
- }
-
- //seek next key in current iter
- if (currentIter != null && seekHigherKeyInIter(minKey, currentIter) == null) {
- // no more keys left in this iterator
- currentIter.close();
- iters.remove(currentIter);
- }
- }
-
- private ByteBuffer seekHigherKeyInIter(ByteBuffer key, HFileSortedIterator iter) throws IOException {
- ByteBuffer newK = iter.getKeyBB();
-
- // remove all duplicates by incrementing iterator when a key is less than
- // equal to current key
- while (ByteComparator.compareBytes(newK.array(), newK.arrayOffset(), newK.remaining(), key.array(), key.arrayOffset(), key.remaining()) <= 0) {
- entriesRemaining--;
- if (iter.hasNext()) {
- newK = iter.nextBB();
- } else {
- newK = null;
- break;
- }
- }
- return newK;
- }
-
- @Override
- public ByteBuffer getKey() {
- return getKeyBB();
- }
- public ByteBuffer getKeyBB() {
- if (currentKey == null) {
- throw new IllegalStateException();
- }
- return currentKey;
- }
-
- @Override
- public ByteBuffer getValue() {
- return getValueBB();
- }
- public ByteBuffer getValueBB() {
- if (currentValue == null) {
- throw new IllegalStateException();
- }
- return currentValue;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() {
- for (HoplogIterator<byte[], byte[]> iter : iters) {
- iter.close();
- }
- }
-
- public int getRemainingEntryCount() {
- return entriesRemaining;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetReader.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetReader.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetReader.java
deleted file mode 100644
index 789a616..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetReader.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Iterator;
-
-/**
- * Reads a sorted oplog file or a merged set of sorted oplogs.
- */
-public interface HoplogSetReader<K, V> {
- /**
- * Returns the value associated with the given key.
- */
- V read(K key) throws IOException;
-
- /**
- * Iterators over the entire contents of the sorted file.
- *
- * @return the sorted iterator
- * @throws IOException
- */
- HoplogIterator<K, V> scan() throws IOException;
-
- /**
- * Scans the available keys and allows iteration over the interval [from, to) where the starting
- * key is included and the ending key is excluded from the results.
- *
- * @param from
- * the start key
- * @param to
- * the end key
- * @return the sorted iterator
- * @throws IOException
- */
- HoplogIterator<K, V> scan(K from, K to) throws IOException;
-
- /**
- * Scans the keys and allows iteration between the given keys.
- *
- * @param from
- * the start key
- * @param fromInclusive
- * true if the start key is included in the scan
- * @param to
- * the end key
- * @param toInclusive
- * true if the end key is included in the scan
- * @return the sorted iterator
- * @throws IOException
- */
- HoplogIterator<K, V> scan(K from, boolean fromInclusive, K to, boolean toInclusive) throws IOException;
-
-
- /**
- * Scans the available keys and allows iteration over the offset
- * specified as parameters
- *
- *
- * @param startOffset
- * the start offset
- * @param length
- * bytes to read
- * @return the sorted iterator
- * @throws IOException
- */
- HoplogIterator<K, V> scan(long startOffset, long length) throws IOException;
-
- /**
- * Using Cardinality estimator provides an approximate number of entries
- *
- * @return the number of entries
- */
- long sizeEstimate();
-
- /**
- * Returns true if the reader has been closed.
- * @return true if closed
- */
- boolean isClosed();
-
- /**
- * Allows sorted iteration through a set of keys and values.
- */
- public interface HoplogIterator<K, V> {
- K getKey();
-
- V getValue();
-
- /** moves to next element and returns the key object */
- K next() throws IOException;
-
- boolean hasNext();
-
- void close();
-
- void remove();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SequenceFileHoplog.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SequenceFileHoplog.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SequenceFileHoplog.java
deleted file mode 100644
index a2926ff..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SequenceFileHoplog.java
+++ /dev/null
@@ -1,395 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.Closeable;
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.EnumMap;
-
-import com.gemstone.gemfire.internal.hll.ICardinality;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
-import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile;
-import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Reader;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-import com.gemstone.gemfire.internal.Version;
-
-import org.apache.logging.log4j.Logger;
-
-/**
- * Implements Sequence file based {@link Hoplog}
- *
- *
- */
-public class SequenceFileHoplog extends AbstractHoplog{
-
- public SequenceFileHoplog(FileSystem inputFS, Path filePath,
- SortedOplogStatistics stats)
- throws IOException
- {
- super(inputFS, filePath, stats);
- }
- @Override
- public void close() throws IOException {
- // Nothing to do
- }
-
- @Override
- public HoplogReader getReader() throws IOException {
- return new SequenceFileReader();
- }
-
- @Override
- /**
- * gets the writer for sequence file.
- *
- * @param keys is not used for SequenceFileHoplog class
- */
- public HoplogWriter createWriter(int keys) throws IOException {
- return new SequenceFileHoplogWriter();
- }
-
- @Override
- public boolean isClosed() {
- return false;
- }
-
- @Override
- public void close(boolean clearCache) throws IOException {
- // Nothing to do
- }
-
- /**
- * Currently, hsync does not update the file size on namenode. So, if last time the
- * process died after calling hsync but before calling file close, the file is
- * left with an inconsistent file size. This is a workaround that - open the file stream in append
- * mode and close it. This fixes the file size on the namenode.
- *
- * @throws IOException
- * @return true if the file size was fixed
- */
- public boolean fixFileSize() throws IOException {
- // Try to fix the file size
- // Loop so that the expected expceptions can be ignored 3
- // times
- if (logger.isDebugEnabled())
- logger.debug("{}Fixing size of hoplog " + path, logPrefix);
- Exception e = null;
- boolean exceptionThrown = false;
- for (int i =0; i < 3; i++) {
- try {
- FSDataOutputStream stream = fsProvider.getFS().append(path);
- stream.close();
- stream = null;
- } catch (IOException ie) {
- exceptionThrown = true;
- e = ie;
- if (logger.isDebugEnabled())
- logger.debug("{}Retry run " + (i + 1) + ": Hoplog " + path + " is still a temporary " +
- "hoplog because the node managing it wasn't shutdown properly last time. Failed to " +
- "fix the hoplog because an exception was thrown " + e, logPrefix );
- }
- // As either RecoveryInProgressException was thrown or
- // Already being created exception was thrown, wait for
- // sometime before next retry.
- if (exceptionThrown) {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e1) {
- }
- exceptionThrown = false;
- } else {
- // no exception was thrown, break;
- return true;
- }
- }
- logger.info (logPrefix, LocalizedMessage.create(LocalizedStrings.DEBUG, "Hoplog " + path + " is still a temporary " +
- "hoplog because the node managing it wasn't shutdown properly last time. Failed to " +
- "fix the hoplog because an exception was thrown " + e));
-
- return false;
- }
-
- @Override
- public String toString() {
- return "SequenceFileHplog[" + getFileName() + "]";
- }
-
- private class SequenceFileHoplogWriter implements HoplogWriter {
-
- private SequenceFile.Writer writer = null;
-
- public SequenceFileHoplogWriter() throws IOException{
- writer = AbstractHoplog.getSequenceFileWriter(path, conf, logger);
- }
-
- @Override
- public void close() throws IOException {
- writer.close();
- if (logger.isDebugEnabled())
- logger.debug("{}Completed creating hoplog " + path, logPrefix);
- }
-
- @Override
- public void hsync() throws IOException {
- writer.hsyncWithSizeUpdate();
- if (logger.isDebugEnabled())
- logger.debug("{}hsync'ed a batch of data to hoplog " + path, logPrefix);
- }
-
- @Override
- public void append(byte[] key, byte[] value) throws IOException {
- writer.append(new BytesWritable(key), new BytesWritable(value));
- }
-
- @Override
- public void append(ByteBuffer key, ByteBuffer value) throws IOException {
- throw new UnsupportedOperationException("Not supported for Sequence files");
- }
-
- @Override
- public void close(EnumMap<Meta, byte[]> metadata) throws IOException {
- throw new UnsupportedOperationException("Not supported for Sequence files");
- }
- @Override
- public long getCurrentSize() throws IOException {
- return writer.getLength();
- }
-
- }
- /**
- * Sequence file reader. This is currently to be used only by MapReduce jobs and
- * test functions
- *
- */
- public class SequenceFileReader implements HoplogReader, Closeable {
- @Override
- public byte[] read(byte[] key) throws IOException {
- throw new UnsupportedOperationException("Not supported for Sequence files");
- }
-
- @Override
- public HoplogIterator<byte[], byte[]> scan()
- throws IOException {
- return new SequenceFileIterator(fsProvider.getFS(), path, 0, Long.MAX_VALUE, conf, logger);
- }
-
- @Override
- public HoplogIterator<byte[], byte[]> scan(
- byte[] from, byte[] to) throws IOException {
- throw new UnsupportedOperationException("Not supported for Sequence files");
- }
-
- @Override
- public HoplogIterator<byte[], byte[]> scan(
- long startOffset, long length) throws IOException {
- return new SequenceFileIterator(fsProvider.getFS(), path, startOffset, length, conf, logger);
- }
-
- @Override
- public HoplogIterator<byte[], byte[]> scan(
- byte[] from, boolean fromInclusive, byte[] to, boolean toInclusive)
- throws IOException {
- throw new UnsupportedOperationException("Not supported for Sequence files");
- }
-
- @Override
- public boolean isClosed() {
- throw new UnsupportedOperationException("Not supported for Sequence files.");
- }
-
- @Override
- public void close() throws IOException {
- throw new UnsupportedOperationException("Not supported for Sequence files. Close the iterator instead.");
- }
-
- @Override
- public ByteBuffer get(byte[] key) throws IOException {
- throw new UnsupportedOperationException("Not supported for Sequence files");
- }
-
- @Override
- public BloomFilter getBloomFilter() throws IOException {
- throw new UnsupportedOperationException("Not supported for Sequence files");
- }
-
- @Override
- public long getEntryCount() {
- throw new UnsupportedOperationException("Not supported for Sequence files");
- }
-
- @Override
- public ICardinality getCardinalityEstimator() {
- throw new UnsupportedOperationException("Not supported for Sequence files");
- }
-
- @Override
- public long sizeEstimate() {
- throw new UnsupportedOperationException("Not supported for Sequence files");
- }
-
-
- }
-
- /**
- * Sequence file iterator. This is currently to be used only by MapReduce jobs and
- * test functions
- *
- */
- public static class SequenceFileIterator implements HoplogIterator<byte[], byte[]> {
-
- SequenceFile.Reader reader = null;
- private BytesWritable prefetchedKey = null;
- private BytesWritable prefetchedValue = null;
- private byte[] currentKey;
- private byte[] currentValue;
- boolean hasNext = false;
- Logger logger;
- Path path;
- private long start;
- private long end;
-
- public SequenceFileIterator(FileSystem fs, Path path, long startOffset,
- long length, Configuration conf, Logger logger)
- throws IOException {
- Reader.Option optPath = SequenceFile.Reader.file(path);
-
- // Hadoop has a configuration parameter io.serializations that is a list of serialization
- // classes which can be used for obtaining serializers and deserializers. This parameter
- // by default contains avro classes. When a sequence file is created, it calls
- // SerializationFactory.getSerializer(keyclass). This internally creates objects using
- // reflection of all the classes that were part of io.serializations. But since, there is
- // no avro class available it throws an exception.
- // Before creating a sequenceFile, override the io.serializations parameter and pass only the classes
- // that are important to us.
- String serializations[] = conf.getStrings("io.serializations",
- new String[]{"org.apache.hadoop.io.serializer.WritableSerialization"});
- conf.setStrings("io.serializations",
- new String[]{"org.apache.hadoop.io.serializer.WritableSerialization"});
- // create reader
- boolean emptyFile = false;
- try {
- reader = new SequenceFile.Reader(conf, optPath);
- }catch (EOFException e) {
- // this is ok as the file has ended. just return false that no more records available
- emptyFile = true;
- }
- // reset the configuration to its original value
- conf.setStrings("io.serializations", serializations);
- this.logger = logger;
- this.path = path;
-
- if (emptyFile) {
- hasNext = false;
- } else {
- // The file should be read from the first sync marker after the start position and
- // until the first sync marker after the end position is seen.
- this.end = startOffset + length;
- if (startOffset > reader.getPosition()) {
- reader.sync(startOffset); // sync to start
- }
- this.start = reader.getPosition();
- this.hasNext = this.start < this.end;
- if (hasNext)
- readNext();
- }
- }
-
-
- public Version getVersion(){
- String version = reader.getMetadata().get(new Text(Meta.GEMFIRE_VERSION.name())).toString();
- return Version.fromOrdinalOrCurrent(Short.parseShort(version));
- }
- @Override
- public boolean hasNext() {
- return hasNext;
- }
-
- @Override
- public byte[] next() {
- currentKey = prefetchedKey.getBytes();
- currentValue = prefetchedValue.getBytes();
-
- readNext();
-
- return currentKey;
- }
-
- private void readNext() {
- try {
- long pos = reader.getPosition();
- prefetchedKey = new BytesWritable();
- prefetchedValue = new BytesWritable();
- hasNext = reader.next(prefetchedKey, prefetchedValue);
- // The file should be read from the first sync marker after the start position and
- // until the first sync marker after the end position is seen.
- if (pos >= end && reader.syncSeen()) {
- hasNext = false;
- }
- } catch (EOFException e) {
- // this is ok as the file has ended. just return false that no more records available
- hasNext = false;
- }
- catch (IOException e) {
- hasNext = false;
- logger.error(LocalizedMessage.create(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE, path), e);
- throw new HDFSIOException(
- LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(path), e);
- }
- }
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Not supported for Sequence files");
- }
-
- @Override
- public void close() {
- IOUtils.closeStream(reader);
- }
-
- @Override
- public byte[] getKey() {
- return currentKey;
- }
-
- @Override
- public byte[] getValue() {
- return currentValue;
- }
-
- /** Returns true iff the previous call to next passed a sync mark.*/
- public boolean syncSeen() { return reader.syncSeen(); }
-
- /** Return the current byte position in the input file. */
- public synchronized long getPosition() throws IOException {
- return reader.getPosition();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/AbstractGFRecordReader.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/AbstractGFRecordReader.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/AbstractGFRecordReader.java
deleted file mode 100644
index f5b63cc..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/AbstractGFRecordReader.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapred;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.lib.CombineFileSplit;
-
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
-import com.gemstone.gemfire.cache.hdfs.internal.UnsortedHoplogPersistedEvent;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.GFKey;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HDFSSplitIterator;
-
-public class AbstractGFRecordReader
- extends
- com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.AbstractGFRecordReader
- implements RecordReader<GFKey, PersistedEventImpl> {
-
- /**
- * Initializes instance of record reader using file split and job
- * configuration
- *
- * @param split
- * @param conf
- * @throws IOException
- */
- public void initialize(CombineFileSplit split, JobConf conf) throws IOException {
- CombineFileSplit cSplit = (CombineFileSplit) split;
- Path[] path = cSplit.getPaths();
- long[] start = cSplit.getStartOffsets();
- long[] len = cSplit.getLengths();
-
- FileSystem fs = cSplit.getPath(0).getFileSystem(conf);
- this.splitIterator = HDFSSplitIterator.newInstance(fs, path, start, len, 0l, 0l);
- }
-
- @Override
- public boolean next(GFKey key, PersistedEventImpl value) throws IOException {
- /*
- * if there are more records in the hoplog, iterate to the next record. Set
- * key object as is.
- */
-
- if (!super.hasNext()) {
- key.setKey(null);
- // TODO make value null;
- return false;
- }
-
- super.next();
-
- key.setKey(super.getKey().getKey());
- PersistedEventImpl usersValue = super.getValue();
- value.copy(usersValue);
- return true;
- }
-
- @Override
- public GFKey createKey() {
- return new GFKey();
- }
-
- @Override
- public PersistedEventImpl createValue() {
- if(this.isSequential) {
- return new UnsortedHoplogPersistedEvent();
- } else {
- return new SortedHoplogPersistedEvent();
- }
- }
-
- @Override
- public long getPos() throws IOException {
- // there is no efficient way to find the position of key in hoplog file.
- return 0;
- }
-
- @Override
- public void close() throws IOException {
- super.close();
- }
-
- @Override
- public float getProgress() throws IOException {
- return super.getProgressRatio();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFInputFormat.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFInputFormat.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFInputFormat.java
deleted file mode 100644
index 0e0e455..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFInputFormat.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapred;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.lib.CombineFileSplit;
-
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.GFKey;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil.HoplogOptimizedSplitter;
-
-public class GFInputFormat extends
- com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.GFInputFormat
- implements InputFormat<GFKey, PersistedEventImpl>, JobConfigurable {
-
- @Override
- public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- this.conf = job;
-
- Collection<FileStatus> hoplogs = getHoplogs();
- return createSplits(job, hoplogs);
- }
-
- /**
- * Creates an input split for every block occupied by hoplogs of the input
- * regions
- *
- * @param job
- * @param hoplogs
- * @return array of input splits of type file input split
- * @throws IOException
- */
- private InputSplit[] createSplits(JobConf job, Collection<FileStatus> hoplogs)
- throws IOException {
- if (hoplogs == null || hoplogs.isEmpty()) {
- return new InputSplit[0];
- }
-
- HoplogOptimizedSplitter splitter = new HoplogOptimizedSplitter(hoplogs);
- List<org.apache.hadoop.mapreduce.InputSplit> mr2Splits = splitter.getOptimizedSplits(conf);
- InputSplit[] splits = new InputSplit[mr2Splits.size()];
- int i = 0;
- for (org.apache.hadoop.mapreduce.InputSplit inputSplit : mr2Splits) {
- org.apache.hadoop.mapreduce.lib.input.CombineFileSplit mr2Spit;
- mr2Spit = (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) inputSplit;
-
- CombineFileSplit split = new CombineFileSplit(job, mr2Spit.getPaths(),
- mr2Spit.getStartOffsets(), mr2Spit.getLengths(),
- mr2Spit.getLocations());
- splits[i] = split;
- i++;
- }
-
- return splits;
- }
-
- @Override
- public RecordReader<GFKey, PersistedEventImpl> getRecordReader(
- InputSplit split, JobConf job, Reporter reporter) throws IOException {
-
- CombineFileSplit cSplit = (CombineFileSplit) split;
- AbstractGFRecordReader reader = new AbstractGFRecordReader();
- reader.initialize(cSplit, job);
- return reader;
- }
-
- @Override
- public void configure(JobConf job) {
- this.conf = job;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFOutputFormat.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFOutputFormat.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFOutputFormat.java
deleted file mode 100644
index 1494e9f..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFOutputFormat.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapred;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.Progressable;
-
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.client.ClientCache;
-
-/**
- * Output format for gemfire. The records provided to writers created by this
- * output format are PUT in a live gemfire cluster.
- *
- */
-public class GFOutputFormat extends
- com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.GFOutputFormat
- implements OutputFormat<Object, Object> {
-
- @Override
- public RecordWriter<Object, Object> getRecordWriter(
- FileSystem ignored, JobConf job, String name, Progressable progress)
- throws IOException {
- ClientCache cache = getClientCacheInstance(job);
- return new GFRecordWriter(cache, job);
- }
-
- @Override
- public void checkOutputSpecs(FileSystem ignored, JobConf job)
- throws IOException {
- validateConfiguration(job);
- }
-
- public class GFRecordWriter implements RecordWriter<Object, Object> {
- private ClientCache clientCache;
- private Region<Object, Object> region;
-
- public GFRecordWriter(ClientCache cache, Configuration conf) {
- this.clientCache = cache;
- region = getRegionInstance(conf, clientCache);
- }
-
- @Override
- public void write(Object key, Object value) throws IOException {
- executePut(region, key, value);
- }
-
- @Override
- public void close(Reporter reporter) throws IOException {
- closeClientCache(clientCache);
- // TODO update reporter
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/AbstractGFRecordReader.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/AbstractGFRecordReader.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/AbstractGFRecordReader.java
deleted file mode 100644
index 2c71b18..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/AbstractGFRecordReader.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
-import com.gemstone.gemfire.cache.hdfs.internal.UnsortedHoplogPersistedEvent;
-import com.gemstone.gemfire.internal.util.BlobHelper;
-
-public class AbstractGFRecordReader extends
- RecordReader<GFKey, PersistedEventImpl> {
-
- // constant overhead of each KV in hfile. This is used in computing the
- // progress of record reader
- protected long RECORD_OVERHEAD = 8;
-
- // accounting for number of bytes already read from the hfile
- private long bytesRead;
-
- protected boolean isSequential;
-
- protected HDFSSplitIterator splitIterator;
-
- @Override
- public void initialize(InputSplit split, TaskAttemptContext context)
- throws IOException, InterruptedException {
- CombineFileSplit cSplit = (CombineFileSplit) split;
- Path[] path = cSplit.getPaths();
- long[] start = cSplit.getStartOffsets();
- long[] len = cSplit.getLengths();
-
- Configuration conf = context.getConfiguration();
- FileSystem fs = cSplit.getPath(0).getFileSystem(conf);
-
- this.splitIterator = HDFSSplitIterator.newInstance(fs, path, start, len, 0l, 0l);
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- return next();
- }
-
- protected boolean next() throws IOException {
- if (!hasNext()) {
- return false;
- }
-
- splitIterator.next();
- bytesRead += (splitIterator.getKey().length + splitIterator.getValue().length);
- bytesRead += RECORD_OVERHEAD;
- return true;
- }
-
- protected boolean hasNext() throws IOException {
- return splitIterator.hasNext();
- }
-
- @Override
- public GFKey getCurrentKey() throws IOException, InterruptedException {
- return getKey();
- }
-
- protected GFKey getKey() throws IOException {
- try {
- GFKey key = new GFKey();
- key.setKey(BlobHelper.deserializeBlob(splitIterator.getKey()));
- return key;
- } catch (ClassNotFoundException e) {
- // TODO resolve logging
- return null;
- }
- }
-
- @Override
- public PersistedEventImpl getCurrentValue() throws IOException,
- InterruptedException {
- return getValue();
- }
-
- protected PersistedEventImpl getValue() throws IOException {
- try {
- byte[] valueBytes = splitIterator.getValue();
- if(isSequential) {
- return UnsortedHoplogPersistedEvent.fromBytes(valueBytes);
- } else {
- return SortedHoplogPersistedEvent.fromBytes(valueBytes);
- }
- } catch (ClassNotFoundException e) {
- // TODO resolve logging
- return null;
- }
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return getProgressRatio();
- }
-
- protected float getProgressRatio() throws IOException {
- if (!splitIterator.hasNext()) {
- return 1.0f;
- } else if (bytesRead > splitIterator.getLength()) {
- // the record reader is expected to read more number of bytes as it
- // continues till beginning of next block. hence if extra reading has
- // started return fixed value
- return 0.95f;
- } else {
- return Math.min(1.0f, bytesRead / (float) (splitIterator.getLength()));
- }
- }
-
- @Override
- public void close() throws IOException {
- splitIterator.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFInputFormat.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFInputFormat.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFInputFormat.java
deleted file mode 100644
index ff64ceb..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFInputFormat.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil.HoplogOptimizedSplitter;
-
-public class GFInputFormat extends InputFormat<GFKey, PersistedEventImpl>
- implements Configurable {
- public static final String HOME_DIR = "mapreduce.input.gfinputformat.homedir";
- public static final String INPUT_REGION = "mapreduce.input.gfinputformat.inputregion";
- public static final String START_TIME = "mapreduce.input.gfinputformat.starttime";
- public static final String END_TIME = "mapreduce.input.gfinputformat.endtime";
- public static final String CHECKPOINT = "mapreduce.input.gfinputformat.checkpoint";
-
- protected Configuration conf;
-
- @Override
- public List<InputSplit> getSplits(JobContext job) throws IOException {
- this.conf = job.getConfiguration();
-
- Collection<FileStatus> hoplogs = getHoplogs();
- return createSplits(hoplogs);
- }
-
- /**
- * Identifies filters provided in the job configuration and creates a list of
- * sorted hoplogs. If there are no sorted hoplogs, checks if the region has
- * sequential hoplogs
- *
- * @return list of hoplogs
- * @throws IOException
- */
- protected Collection<FileStatus> getHoplogs() throws IOException {
- String regionName = conf.get(INPUT_REGION);
- System.out.println("GFInputFormat: Region Name is " + regionName);
- if (regionName == null || regionName.trim().isEmpty()) {
- // incomplete job configuration, region name must be provided
- return new ArrayList<FileStatus>();
- }
-
- String home = conf.get(HOME_DIR, HDFSStore.DEFAULT_HOME_DIR);
- regionName = HdfsRegionManager.getRegionFolder(regionName);
- Path regionPath = new Path(home + "/" + regionName);
- FileSystem fs = regionPath.getFileSystem(conf);
-
- long start = conf.getLong(START_TIME, 0l);
- long end = conf.getLong(END_TIME, 0l);
- boolean checkpoint = conf.getBoolean(CHECKPOINT, true);
-
- // if the region contains flush hoplogs then the region is of type RW.
- Collection<FileStatus> hoplogs;
- hoplogs = HoplogUtil.filterHoplogs(fs, regionPath, start, end, checkpoint);
- return hoplogs == null ? new ArrayList<FileStatus>() : hoplogs;
- }
-
- /**
- * Creates an input split for every block occupied by hoplogs of the input
- * regions
- *
- * @param hoplogs
- * @return list of input splits of type file input split
- * @throws IOException
- */
- private List<InputSplit> createSplits(Collection<FileStatus> hoplogs)
- throws IOException {
- List<InputSplit> splits = new ArrayList<InputSplit>();
- if (hoplogs == null || hoplogs.isEmpty()) {
- return splits;
- }
-
- HoplogOptimizedSplitter splitter = new HoplogOptimizedSplitter(hoplogs);
- return splitter.getOptimizedSplits(conf);
- }
-
- @Override
- public RecordReader<GFKey, PersistedEventImpl> createRecordReader(
- InputSplit split, TaskAttemptContext context) throws IOException,
- InterruptedException {
- return new AbstractGFRecordReader();
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKey.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKey.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKey.java
deleted file mode 100644
index 5bba2c7..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKey.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-
-import com.gemstone.gemfire.internal.util.BlobHelper;
-
-public class GFKey implements WritableComparable<GFKey> {
- private Object key;
-
- public Object getKey() {
- return key;
- }
-
- public void setKey(Object key) {
- this.key = key;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- byte[] bytes = BlobHelper.serializeToBlob(key);
- out.writeInt(bytes.length);
- out.write(bytes, 0, bytes.length);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- int len = in.readInt();
- byte[] bytes = new byte[len];
- in.readFully(bytes, 0, len);
- try {
- key = BlobHelper.deserializeBlob(bytes);
- } catch (ClassNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- @Override
- public int compareTo(GFKey o) {
- try {
- byte[] b1 = BlobHelper.serializeToBlob(key);
- byte[] b2 = BlobHelper.serializeToBlob(o.key);
- return WritableComparator.compareBytes(b1, 0, b1.length, b2, 0, b2.length);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- return 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFOutputFormat.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFOutputFormat.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFOutputFormat.java
deleted file mode 100644
index 3be2ab0..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFOutputFormat.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.InvalidJobConfException;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.RegionExistsException;
-import com.gemstone.gemfire.cache.client.ClientCache;
-import com.gemstone.gemfire.cache.client.ClientCacheFactory;
-import com.gemstone.gemfire.cache.client.ClientRegionFactory;
-import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
-import com.gemstone.gemfire.cache.server.CacheServer;
-import com.gemstone.gemfire.management.internal.cli.converters.ConnectionEndpointConverter;
-
-/**
- * Output format for gemfire. The records provided to writers created by this
- * output format are PUT in a live gemfire cluster.
- *
- */
-public class GFOutputFormat extends OutputFormat<Object, Object> {
- public static final String REGION = "mapreduce.output.gfoutputformat.outputregion";
- public static final String LOCATOR_HOST = "mapreduce.output.gfoutputformat.locatorhost";
- public static final String LOCATOR_PORT = "mapreduce.output.gfoutputformat.locatorport";
- public static final String SERVER_HOST = "mapreduce.output.gfoutputformat.serverhost";
- public static final String SERVER_PORT = "mapreduce.output.gfoutputformat.serverport";
-
- @Override
- public RecordWriter<Object, Object> getRecordWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- Configuration conf = context.getConfiguration();
- ClientCache cache = getClientCacheInstance(conf);
- return new GFRecordWriter(cache, context.getConfiguration());
- }
-
- public ClientCache getClientCacheInstance(Configuration conf) {
- // if locator host is provided create a client cache instance using
- // connection to locator. If locator is not provided and server host is also
- // not provided, connect using default locator
- ClientCache cache;
- String serverHost = conf.get(SERVER_HOST);
- if (serverHost == null || serverHost.isEmpty()) {
- cache = createGFWriterUsingLocator(conf);
- } else {
- cache = createGFWriterUsingServer(conf);
- }
- return cache;
- }
-
- /**
- * Creates instance of {@link ClientCache} by connecting to GF cluster through
- * locator
- */
- public ClientCache createGFWriterUsingLocator(Configuration conf) {
- // if locator host is not provided assume localhost
- String locator = conf.get(LOCATOR_HOST,
- ConnectionEndpointConverter.DEFAULT_LOCATOR_HOST);
- // if locator port is not provided assume default locator port 10334
- int port = conf.getInt(LOCATOR_PORT,
- ConnectionEndpointConverter.DEFAULT_LOCATOR_PORT);
-
- // create gemfire client cache instance
- ClientCacheFactory ccf = new ClientCacheFactory();
- ccf.addPoolLocator(locator, port);
- ClientCache cache = ccf.create();
- return cache;
- }
-
- /**
- * Creates instance of {@link ClientCache} by connecting to GF cluster through
- * GF server
- */
- public ClientCache createGFWriterUsingServer(Configuration conf) {
- String server = conf.get(SERVER_HOST);
- // if server port is not provided assume default server port, 40404
- int port = conf.getInt(SERVER_PORT, CacheServer.DEFAULT_PORT);
-
- // create gemfire client cache instance
- ClientCacheFactory ccf = new ClientCacheFactory();
- ccf.addPoolServer(server, port);
- ClientCache cache = ccf.create();
- return cache;
- }
-
- public Region<Object, Object> getRegionInstance(Configuration conf,
- ClientCache cache) {
- Region<Object, Object> region;
-
- // create gemfire region in proxy mode
- String regionName = conf.get(REGION);
- ClientRegionFactory<Object, Object> regionFactory = cache
- .createClientRegionFactory(ClientRegionShortcut.PROXY);
- try {
- region = regionFactory.create(regionName);
- } catch (RegionExistsException e) {
- region = cache.getRegion(regionName);
- }
-
- return region;
- }
-
- /**
- * Puts a K-V pair in region
- * @param region
- * @param key
- * @param value
- */
- public void executePut(Region<Object, Object> region, Object key, Object value) {
- region.put(key, value);
- }
-
- /**
- * Closes client cache instance
- * @param clientCache
- */
- public void closeClientCache(ClientCache clientCache) {
- if (clientCache != null && !clientCache.isClosed()) {
- clientCache.close();
- }
- }
-
- /**
- * Validates correctness and completeness of job's output configuration
- *
- * @param conf
- * @throws InvalidJobConfException
- */
- protected void validateConfiguration(Configuration conf)
- throws InvalidJobConfException {
- // User must configure the output region name.
- String region = conf.get(REGION);
- if (region == null || region.trim().isEmpty()) {
- throw new InvalidJobConfException("Output Region name not provided.");
- }
-
- // TODO validate if a client connected to gemfire cluster can be created
- }
-
- @Override
- public void checkOutputSpecs(JobContext context) throws IOException,
- InterruptedException {
- Configuration conf = context.getConfiguration();
- validateConfiguration(conf);
- }
-
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
- context);
- }
-
- public class GFRecordWriter extends RecordWriter<Object, Object> {
- private ClientCache clientCache;
- private Region<Object, Object> region;
-
- public GFRecordWriter(ClientCache cache, Configuration conf) {
- this.clientCache = cache;
- region = getRegionInstance(conf, clientCache);
- }
-
- @Override
- public void write(Object key, Object value) throws IOException,
- InterruptedException {
- executePut(region, key, value);
- }
-
- @Override
- public void close(TaskAttemptContext context) throws IOException,
- InterruptedException {
- closeClientCache(clientCache);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIterator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIterator.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIterator.java
deleted file mode 100644
index 869ad0d..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIterator.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplog;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
-import com.gemstone.gemfire.i18n.LogWriterI18n;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-
-/**
- * Iterates over the records in part of a hoplog. This iterator
- * is passed from the map reduce job into the gemfirexd LanguageConnectionContext
- * for gemfirexd to use as the iterator during the map phase.
- *
- */
-public abstract class HDFSSplitIterator {
- // data object for holding path, offset and length, of all the blocks this
- // iterator needs to iterate on
- private CombineFileSplit split;
-
- // the following members are pointers to current hoplog which is being
- // iterated upon
- private int currentHopIndex = 0;
- private AbstractHoplog hoplog;
- protected HoplogIterator<byte[], byte[]> iterator;
- byte[] key;
- byte[] value;
-
- private long bytesRead;
- protected long RECORD_OVERHEAD = 8;
-
- private long startTime = 0l;
- private long endTime = 0l;
-
- protected FileSystem fs;
- private static final Logger logger = LogService.getLogger();
- protected final String logPrefix = "<" + "HDFSSplitIterator" + "> ";
-
- public HDFSSplitIterator(FileSystem fs, Path[] paths, long[] offsets, long[] lengths, long startTime, long endTime) throws IOException {
- this.fs = fs;
- this.split = new CombineFileSplit(paths, offsets, lengths, null);
- while(currentHopIndex < split.getNumPaths() && !fs.exists(split.getPath(currentHopIndex))){
- logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_CLEANED_UP_BY_JANITOR, split.getPath(currentHopIndex)));
- currentHopIndex++;
- }
- if(currentHopIndex == split.getNumPaths()){
- this.hoplog = null;
- iterator = null;
- } else {
- this.hoplog = getHoplog(fs,split.getPath(currentHopIndex));
- iterator = hoplog.getReader().scan(split.getOffset(currentHopIndex), split.getLength(currentHopIndex));
- }
- this.startTime = startTime;
- this.endTime = endTime;
- }
-
- /**
- * Get the appropriate iterator for the file type.
- */
- public static HDFSSplitIterator newInstance(FileSystem fs, Path[] path,
- long[] start, long[] len, long startTime, long endTime)
- throws IOException {
- String fileName = path[0].getName();
- if (fileName.endsWith(AbstractHoplogOrganizer.SEQ_HOPLOG_EXTENSION)) {
- return new StreamSplitIterator(fs, path, start, len, startTime, endTime);
- } else {
- return new RWSplitIterator(fs, path, start, len, startTime, endTime);
- }
- }
-
- public final boolean hasNext() throws IOException {
- while (currentHopIndex < split.getNumPaths()) {
- if (iterator != null) {
- if(iterator.hasNext()) {
- return true;
- } else {
- iterator.close();
- iterator = null;
- hoplog.close();
- hoplog = null;
- }
- }
-
- if (iterator == null) {
- // Iterator is null if this is first read from this iterator or all the
- // entries from the previous iterator have been read. create iterator on
- // the next hoplog.
- currentHopIndex++;
- while (currentHopIndex < split.getNumPaths() && !fs.exists(split.getPath(currentHopIndex))){
- logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_CLEANED_UP_BY_JANITOR, split.getPath(currentHopIndex).toString()));
- currentHopIndex++;
- }
- if (currentHopIndex >= split.getNumPaths()) {
- return false;
- }
- hoplog = getHoplog(fs, split.getPath(currentHopIndex));
- iterator = hoplog.getReader().scan(split.getOffset(currentHopIndex), split.getLength(currentHopIndex));
- }
- }
-
- return false;
- }
-
- public final boolean next() throws IOException {
- while (hasNext()) {
- key = iterator.next();
- value = iterator.getValue();
- bytesRead += (key.length + value.length);
- bytesRead += RECORD_OVERHEAD;
-
- // if any filter is set, check if the event's timestamp matches the
- // filter. The events returned by the iterator may not be time ordered. So
- // it is important to check filters everytime.
- if (startTime > 0 || endTime > 0) {
- try {
- PersistedEventImpl event = getDeserializedValue();
- long timestamp = event.getTimstamp();
- if (startTime > 0l && timestamp < startTime) {
- continue;
- }
-
- if (endTime > 0l && timestamp > endTime) {
- continue;
- }
- } catch (ClassNotFoundException e) {
- throw new HDFSIOException("Error reading from HDFS", e);
- }
- }
-
- return true;
- }
-
- return false;
- }
-
- public final long getBytesRead() {
- return this.bytesRead;
- }
-
- public final byte[] getKey() {
- return key;
- }
-
- public abstract PersistedEventImpl getDeserializedValue()
- throws ClassNotFoundException, IOException;
-
- protected abstract AbstractHoplog getHoplog(FileSystem fs, Path path)
- throws IOException;
-
- public final byte[] getValue() {
- return value;
- }
-
- public final long getLength() {
- return split.getLength();
- }
-
- public void close() throws IOException {
- if (iterator != null) {
- iterator.close();
- iterator = null;
- }
-
- if (hoplog != null) {
- hoplog.close();
- hoplog.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtil.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtil.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtil.java
deleted file mode 100644
index c4c0d1c..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtil.java
+++ /dev/null
@@ -1,463 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer.HoplogComparator;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig;
-
-public class HoplogUtil {
- /**
- * @param regionPath
- * HDFS path of the region
- * @param fs
- * file system associated with the region
- * @param type
- * type of hoplog to be fetched; flush hoplog or sequence hoplog
- * @return All hoplog file paths belonging to the region provided
- * @throws IOException
- */
- public static Collection<FileStatus> getAllRegionHoplogs(Path regionPath,
- FileSystem fs, String type) throws IOException {
- return getRegionHoplogs(regionPath, fs, type, 0, 0);
- }
-
- /**
- * @param regionPath
- * Region path
- * @param fs
- * file system associated with the region
- * @param type
- * type of hoplog to be fetched; flush hoplog or sequence hoplog
- * @param start
- * Exclude files that do not contain records mutated after start time
- * @param end
- * Exclude files that do not contain records mutated before end time
- * @return All hoplog file paths belonging to the region provided
- * @throws IOException
- */
- public static Collection<FileStatus> getRegionHoplogs(Path regionPath,
- FileSystem fs, String type, long start, long end) throws IOException {
- Collection<Collection<FileStatus>> allBuckets = getBucketHoplogs(
- regionPath, fs, type, start, end);
-
- ArrayList<FileStatus> hoplogs = new ArrayList<FileStatus>();
- for (Collection<FileStatus> bucket : allBuckets) {
- for (FileStatus file : bucket) {
- hoplogs.add(file);
- }
- }
- return hoplogs;
- }
-
- public static Collection<Collection<FileStatus>> getBucketHoplogs(Path regionPath,
- FileSystem fs, String type, long start, long end) throws IOException {
- Collection<Collection<FileStatus>> allBuckets = new ArrayList<Collection<FileStatus>>();
-
- // hoplog files names follow this pattern
- String HOPLOG_NAME_REGEX = AbstractHoplogOrganizer.HOPLOG_NAME_REGEX + type;
- String EXPIRED_HOPLOG_NAME_REGEX = HOPLOG_NAME_REGEX + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION;
- final Pattern pattern = Pattern.compile(HOPLOG_NAME_REGEX);
- final Pattern expiredPattern = Pattern.compile(EXPIRED_HOPLOG_NAME_REGEX);
-
- Path cleanUpIntervalPath = new Path(regionPath.getParent(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME);
- long intervalDurationMillis = readCleanUpIntervalMillis(fs, cleanUpIntervalPath);
-
- // a region directory contains directories for individual buckets. A bucket
- // has a integer name.
- FileStatus[] bucketDirs = fs.listStatus(regionPath);
-
- for (FileStatus bucket : bucketDirs) {
- if (!bucket.isDirectory()) {
- continue;
- }
- try {
- Integer.valueOf(bucket.getPath().getName());
- } catch (NumberFormatException e) {
- continue;
- }
-
- ArrayList<FileStatus> bucketHoplogs = new ArrayList<FileStatus>();
-
- // identify all the flush hoplogs and seq hoplogs by visiting all the
- // bucket directories
- FileStatus[] bucketFiles = fs.listStatus(bucket.getPath());
-
- Map<String, Long> expiredHoplogs = getExpiredHoplogs(fs, bucketFiles, expiredPattern);
-
- FileStatus oldestHopAfterEndTS = null;
- long oldestHopTS = Long.MAX_VALUE;
- long currentTimeStamp = System.currentTimeMillis();
- for (FileStatus file : bucketFiles) {
- if (!file.isFile()) {
- continue;
- }
-
- Matcher match = pattern.matcher(file.getPath().getName());
- if (!match.matches()) {
- continue;
- }
-
- long timeStamp = AbstractHoplogOrganizer.getHoplogTimestamp(match);
- if (start > 0 && timeStamp < start) {
- // this hoplog contains records less than the start time stamp
- continue;
- }
-
- if (end > 0 && timeStamp > end) {
- // this hoplog contains records mutated after end time stamp. Ignore
- // this hoplog if it is not the oldest.
- if (oldestHopTS > timeStamp) {
- oldestHopTS = timeStamp;
- oldestHopAfterEndTS = file;
- }
- continue;
- }
- long expiredTimeStamp = expiredTime(file, expiredHoplogs);
- if (expiredTimeStamp > 0 && intervalDurationMillis > 0) {
- if ((currentTimeStamp - expiredTimeStamp) > 0.8 * intervalDurationMillis) {
- continue;
- }
- }
- bucketHoplogs.add(file);
- }
-
- if (oldestHopAfterEndTS != null) {
- long expiredTimeStamp = expiredTime(oldestHopAfterEndTS, expiredHoplogs);
- if (expiredTimeStamp <= 0 || intervalDurationMillis <=0 ||
- (currentTimeStamp - expiredTimeStamp) <= 0.8 * intervalDurationMillis) {
- bucketHoplogs.add(oldestHopAfterEndTS);
- }
- }
-
- if (bucketHoplogs.size() > 0) {
- allBuckets.add(bucketHoplogs);
- }
- }
-
- return allBuckets;
- }
-
- private static Map<String, Long> getExpiredHoplogs(FileSystem fs, FileStatus[] bucketFiles,
- Pattern expiredPattern) throws IOException{
- Map<String, Long> expiredHoplogs = new HashMap<String,Long>();
-
- for(FileStatus file : bucketFiles) {
- if(!file.isFile()) {
- continue;
- }
- String fileName = file.getPath().getName();
- Matcher match = expiredPattern.matcher(fileName);
- if (!match.matches()){
- continue;
- }
- expiredHoplogs.put(fileName,file.getModificationTime());
- }
- return expiredHoplogs;
- }
-
- private static long expiredTime(FileStatus file, Map<String, Long> expiredHoplogs){
- String expiredMarkerName = file.getPath().getName() +
- AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION;
-
- long expiredTimeStamp = -1;
- if (expiredHoplogs.containsKey(expiredMarkerName)) {
- expiredTimeStamp = expiredHoplogs.get(expiredMarkerName);
- }
- return expiredTimeStamp;
- }
-
- public static long readCleanUpIntervalMillis(FileSystem fs, Path cleanUpIntervalPath) throws IOException{
- if (fs.exists(cleanUpIntervalPath)) {
- FSDataInputStream input = new FSDataInputStream(fs.open(cleanUpIntervalPath));
- long intervalDurationMillis = input.readLong();
- input.close();
- return intervalDurationMillis;
- } else {
- return -1l;
- }
- }
-
- public static void exposeCleanupIntervalMillis(FileSystem fs, Path path, long intervalDurationMillis){
- FSDataInputStream input = null;
- FSDataOutputStream output = null;
- try {
- if(fs.exists(path)){
- input = new FSDataInputStream(fs.open(path));
- if (intervalDurationMillis == input.readLong()) {
- input.close();
- return;
- }
- input.close();
- fs.delete(path, true);
- }
- output = fs.create(path);
- output.writeLong(intervalDurationMillis);
- output.close();
- } catch (IOException e) {
- return;
- } finally {
- try {
- if (input != null){
- input.close();
- }
- if (output != null) {
- output.close();
- }
- } catch(IOException e2) {
-
- }
- }
- }
-
- /**
- * @param regionPath
- * @param fs
- * @return list of latest checkpoint files of all buckets in the region
- * @throws IOException
- */
- public static Collection<FileStatus> getCheckpointFiles(Path regionPath,
- FileSystem fs) throws IOException {
- ArrayList<FileStatus> latestSnapshots = new ArrayList<FileStatus>();
-
- Collection<Collection<FileStatus>> allBuckets = getBucketHoplogs(
- regionPath, fs, AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION, 0, 0);
-
- // extract the latest major compacted hoplog from each bucket
- for (Collection<FileStatus> bucket : allBuckets) {
- FileStatus latestSnapshot = null;
- for (FileStatus file : bucket) {
- if (latestSnapshot == null) {
- latestSnapshot = file;
- } else {
- String name1 = latestSnapshot.getPath().getName();
- String name2 = file.getPath().getName();
-
- if (HoplogComparator.compareByName(name1, name2) > 0) {
- latestSnapshot = file;
- }
- }
- }
-
- if (latestSnapshot != null) {
- latestSnapshots.add(latestSnapshot);
- }
- }
-
- return latestSnapshots;
- }
-
- /**
- * Creates a mapping of hoplog to hdfs blocks on disk
- *
- * @param files
- * list of hoplog file status objects
- * @return array of hdfs block location objects associated with a hoplog
- * @throws IOException
- */
- public static Map<FileStatus, BlockLocation[]> getBlocks(Configuration config,
- Collection<FileStatus> files) throws IOException {
- Map<FileStatus, BlockLocation[]> blocks = new HashMap<FileStatus, BlockLocation[]>();
- if (files == null || files.isEmpty()) {
- return blocks;
- }
-
- FileSystem fs = files.iterator().next().getPath().getFileSystem(config);
-
- for (FileStatus hoplog : files) {
- long length = hoplog.getLen();
- BlockLocation[] fileBlocks = fs.getFileBlockLocations(hoplog, 0, length);
- blocks.put(hoplog, fileBlocks);
- }
-
- return blocks;
- }
-
- /**
- * Filters out hoplogs of a region that do not match time filters and creates
- * a list of hoplogs that may be used by hadoop jobs.
- *
- * @param fs
- * file system instance
- * @param path
- * region path
- * @param start
- * start time in milliseconds
- * @param end
- * end time in milliseconds
- * @param snapshot
- * if true latest snapshot hoplog will be included in the final
- * return list
- * @return filtered collection of hoplogs
- * @throws IOException
- */
- public static Collection<FileStatus> filterHoplogs(FileSystem fs, Path path,
- long start, long end, boolean snapshot) throws IOException {
- ArrayList<FileStatus> hoplogs = new ArrayList<FileStatus>();
-
- // if the region contains flush hoplogs or major compacted files then the
- // region is of type RW.
- // check if the intent is to operate on major compacted files only
- if (snapshot) {
- hoplogs.addAll(getCheckpointFiles(path, fs));
- } else {
- hoplogs.addAll(getRegionHoplogs(path, fs,
- AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, start, end));
- }
-
- if (hoplogs == null || hoplogs.isEmpty()) {
- // there are no sorted hoplogs. Check if sequence hoplogs are present
- // there is no checkpoint mode for write only tables
- hoplogs.addAll(getRegionHoplogs(path, fs,
- AbstractHoplogOrganizer.SEQ_HOPLOG_EXTENSION, start, end));
- }
-
- return hoplogs == null ? new ArrayList<FileStatus>() : hoplogs;
- }
-
- private HoplogUtil() {
- //static methods only.
- }
-
- /**
- * This class creates MR splits from hoplog files. This class leverages
- * CombineFileInputFormat to create locality, node and rack, aware splits
- *
- */
- public static class HoplogOptimizedSplitter extends CombineFileInputFormat<Long, Long> {
- private Collection<FileStatus> hoplogs;
-
- public HoplogOptimizedSplitter(Collection<FileStatus> hoplogs) {
- this.hoplogs = hoplogs;
- }
-
- @Override
- protected List<FileStatus> listStatus(JobContext job) throws IOException {
- /**
- * listStatus in super collects fileStatus for each file again. It also
- * tries to recursively list files in subdirectories. None of this is
- * applicable in this case. Splitter has already collected fileStatus for
- * all files. So bypassing super's method will improve performance as NN
- * chatter will be reduced. Specially helpful if NN is not colocated.
- */
- return new ArrayList<FileStatus>(hoplogs);
- }
-
- /**
- * Creates an array of splits for the input list of hoplogs. Each split is
- * roughly the size of an hdfs block. Hdfs blocks of a hoplog may be smaller
- * than hdfs block size, for e.g. if the hoplog is very small. The method
- * keeps adding hdfs blocks of a hoplog to a split till the split is less
- * than hdfs block size and the block is local to the split.
- */
- public List<InputSplit> getOptimizedSplits(Configuration conf) throws IOException {
-
- if (hoplogs == null || hoplogs.isEmpty()) {
- return null;
- }
- Path[] paths = new Path[hoplogs.size()];
- int i = 0;
- for (FileStatus file : hoplogs) {
- paths[i] = file.getPath();
- i++;
- }
-
- FileStatus hoplog = hoplogs.iterator().next();
- long blockSize = hoplog.getBlockSize();
- setMaxSplitSize(blockSize);
-
- Job job = Job.getInstance(conf);
- setInputPaths(job, paths);
- List<InputSplit> splits = super.getSplits(job);
-
- // in some cases a split may not get populated with host location
- // information. If such a split is created, fill location information of
- // the first file in the split
- ArrayList<CombineFileSplit> newSplits = new ArrayList<CombineFileSplit>();
- for (Iterator<InputSplit> iter = splits.iterator(); iter.hasNext();) {
- CombineFileSplit split = (CombineFileSplit) iter.next();
- if (split.getLocations() != null && split.getLocations().length > 0) {
- continue;
- }
-
- paths = split.getPaths();
- if (paths.length == 0) {
- continue;
- }
- long[] starts = split.getStartOffsets();
- long[] ends = split.getLengths();
-
- FileSystem fs = paths[0].getFileSystem(conf);
- FileStatus file = fs.getFileStatus(paths[0]);
- BlockLocation[] blks = fs.getFileBlockLocations(file, starts[0], ends[0]);
- if (blks != null && blks.length > 0) {
- // hosts found. Need to create a new split and replace the one missing
- // hosts.
- iter.remove();
- String hosts[] = blks[0].getHosts();
- split = new CombineFileSplit(paths, starts, ends, hosts);
- newSplits.add(split);
- }
- }
- splits.addAll(newSplits);
-
- return splits;
- }
-
- @Override
- public List<InputSplit> getSplits(JobContext job) throws IOException {
- // a call to this method is invalid. This class is only meant to create
- // optimized splits independent of the api type
- throw new IllegalStateException();
- }
-
- @Override
- public RecordReader<Long, Long> createRecordReader(InputSplit split,
- TaskAttemptContext arg1) throws IOException {
- // Record reader creation is managed by GFInputFormat. This method should
- // not be called
- throw new IllegalStateException();
- }
- }
-}