You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/19 01:01:18 UTC

[09/17] incubator-geode git commit: GEODE-1072: Removing HDFS related code

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogOrganizer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogOrganizer.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogOrganizer.java
deleted file mode 100644
index f7d746d..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogOrganizer.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.concurrent.Future;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
-import com.gemstone.gemfire.internal.cache.ForceReattemptException;
-
-/**
- * Manages bucket level operations on sorted oplog files including creation, reading, serde, bloom
- * buffering and compaction. Abstracts existence of multiple sorted oplog files
- */
-public interface HoplogOrganizer<T extends PersistedEventImpl> extends HoplogSetReader<byte[], T>,
-    HoplogListener, Closeable {
-
-  /**
-   * Iterates on the input buffer and persists it in a new sorted oplog. This invocation may block
-   * if there are too many outstanding write requests.
-   * 
-   * @param bufferIter
-   *          ordered iterator on a buffer of objects to be persisted
-   * @param count
-   *          number of K,V pairs expected to be part of flush, 0 if unknown
-   * @throws IOException
-   */
-  public void flush(Iterator<? extends QueuedPersistentEvent> bufferIter, int count) 
-      throws IOException, ForceReattemptException;
-  
-  
-  /**
-   * Clear the data in HDFS. This method assumes that the
-   * dispatcher thread has already been paused, so there should be
-   * no concurrent flushes to HDFS when this method is called.
-   * 
-   * @throws IOException
-   */
-  public void clear() throws IOException;
-
-  /**
-   * returns the compactor associated with this set
-   */
-  public Compactor getCompactor();
-  
-  /**
-   * Called to execute bucket maintenance activities, like purge expired files
-   * and create compaction task. Long running activities must be executed
-   * asynchronously, not on this thread, to avoid impact on other buckets
-   * @throws IOException 
-   */
-  public void performMaintenance() throws IOException;
-
-  /**
-   * Schedules a compaction task and returns immediately.
-   * 
-   * @param isMajor true for major compaction, false for minor compaction
-   * @return future for status of compaction request
-   */
-  public Future<CompactionStatus> forceCompaction(boolean isMajor);
-
-  /**
-   * Returns the timestamp of the last completed major compaction
-   * 
-   * @return the timestamp or 0 if a major compaction has not taken place yet
-   */
-  public long getLastMajorCompactionTimestamp();
-
-  public interface Compactor {
-    /**
-     * Requests a compaction operation be performed on this set of sorted oplogs.
-     *
-     * @param isMajor true for major compaction
-     * @param isForced true if the compaction should be carried out even if there
-     * is only one hoplog to compact
-     * 
-     * @return true if compaction was performed, false otherwise
-     * @throws IOException
-     */
-    boolean compact(boolean isMajor, boolean isForced) throws IOException;
-
-    /**
-     * Stop the current compaction operation in the middle and suspend
-     * compaction operations. The current current compaction data
-     * will be thrown away, and no more compaction will be performend
-     * until resume is called. 
-     */
-    void suspend();
-    
-    /**
-     * Resume compaction operations. 
-     */
-    void resume();
-
-    /**
-     * @return true if the compactor is not ready or busy
-     */
-    boolean isBusy(boolean isMajor);
-
-    /**
-     * @return the hdfsStore configuration used by this compactor
-     */
-    public HDFSStore getHdfsStore();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetIterator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetIterator.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetIterator.java
deleted file mode 100644
index 16939db..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetIterator.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HFileSortedOplog.HFileReader.HFileSortedIterator;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.ByteComparator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
-
-/**
- * Provides a merged iterator on set of {@link HFileSortedOplog}
- */
-public class HoplogSetIterator implements HoplogIterator<ByteBuffer, ByteBuffer> {
-  private final List<HFileSortedIterator> iters;
-
-  // Number of entries remaining to be iterated by this scanner
-  private int entriesRemaining;
-
-  // points at the current iterator holding the next entry
-  private ByteBuffer currentKey;
-  private ByteBuffer currentValue;
-
-  public HoplogSetIterator(List<TrackedReference<Hoplog>> targets) throws IOException {
-    iters = new ArrayList<HFileSortedIterator>();
-    for (TrackedReference<Hoplog> oplog : targets) {
-      HFileSortedIterator iter = (HFileSortedIterator) oplog.get().getReader().scan();
-      if (!iter.hasNext()) {
-        // the oplog is empty, exclude from iterator
-        continue;
-      }
-
-      // initialize the iterator
-      iter.nextBB();
-      iters.add(iter);
-      entriesRemaining += oplog.get().getReader().getEntryCount();
-    }
-  }
-
-  public boolean hasNext() {
-    return entriesRemaining > 0;
-  }
-
-  @Override
-  public ByteBuffer next() throws IOException {
-    return nextBB();
-  }
-  public ByteBuffer nextBB() throws IOException {
-    if (!hasNext()) {
-      throw new NoSuchElementException();
-    }
-
-    seekToMinKeyIter();
-
-    return currentKey;
-  }
-
-  private void seekToMinKeyIter() throws IOException {
-    HFileSortedIterator currentIter = null;
-    ByteBuffer minKey = null;
-
-    // scan through all hoplog iterators to reach to the iterator with smallest
-    // key on the head and remove duplicate keys
-    for (Iterator<HFileSortedIterator> iterator = iters.iterator(); iterator.hasNext();) {
-      HFileSortedIterator iter = iterator.next();
-      
-      ByteBuffer tmpK = iter.getKeyBB();
-      ByteBuffer tmpV = iter.getValueBB();
-      if (minKey == null || ByteComparator.compareBytes(tmpK.array(), tmpK.arrayOffset(), tmpK.remaining(), minKey.array(), minKey.arrayOffset(), minKey.remaining()) < 0) {
-        minKey = tmpK;
-        currentKey = tmpK;
-        currentValue = tmpV;
-        currentIter = iter;
-      } else {
-        // remove possible duplicate key entries from iterator
-        if (seekHigherKeyInIter(minKey, iter) == null) {
-          // no more keys left in this iterator
-          iter.close();
-          iterator.remove();
-        }
-      }
-    }
-    
-    //seek next key in current iter
-    if (currentIter != null && seekHigherKeyInIter(minKey, currentIter) == null) {
-      // no more keys left in this iterator
-      currentIter.close();
-      iters.remove(currentIter);
-    }
-  }
-
-  private ByteBuffer seekHigherKeyInIter(ByteBuffer key, HFileSortedIterator iter) throws IOException {
-    ByteBuffer newK = iter.getKeyBB();
-
-    // remove all duplicates by incrementing iterator when a key is less than
-    // equal to current key
-    while (ByteComparator.compareBytes(newK.array(), newK.arrayOffset(), newK.remaining(), key.array(), key.arrayOffset(), key.remaining()) <= 0) {
-      entriesRemaining--;
-      if (iter.hasNext()) {
-        newK = iter.nextBB();
-      } else {
-        newK = null;
-        break;
-      }
-    }
-    return newK;
-  }
-
-  @Override
-  public ByteBuffer getKey() {
-    return getKeyBB();
-  }
-  public ByteBuffer getKeyBB() {
-    if (currentKey == null) {
-      throw new IllegalStateException();
-    }
-    return currentKey;
-  }
-
-  @Override
-  public ByteBuffer getValue() {
-    return getValueBB();
-  }
-  public ByteBuffer getValueBB() {
-    if (currentValue == null) {
-      throw new IllegalStateException();
-    }
-    return currentValue;
-  }
-
-  @Override
-  public void remove() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void close() {
-    for (HoplogIterator<byte[], byte[]> iter : iters) {
-      iter.close();
-    }
-  }
-
-  public int getRemainingEntryCount() {
-    return entriesRemaining;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetReader.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetReader.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetReader.java
deleted file mode 100644
index 789a616..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetReader.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Iterator;
-
-/**
- * Reads a sorted oplog file or a merged set of sorted oplogs.
- */
-public interface HoplogSetReader<K, V> {
-  /**
-   * Returns the value associated with the given key.
-   */
-  V read(K key) throws IOException;
-
-  /**
-   * Iterators over the entire contents of the sorted file.
-   * 
-   * @return the sorted iterator
-   * @throws IOException
-   */
-  HoplogIterator<K, V> scan() throws IOException;
-
-  /**
-   * Scans the available keys and allows iteration over the interval [from, to) where the starting
-   * key is included and the ending key is excluded from the results.
-   * 
-   * @param from
-   *          the start key
-   * @param to
-   *          the end key
-   * @return the sorted iterator
-   * @throws IOException
-   */
-  HoplogIterator<K, V> scan(K from, K to) throws IOException;
-
-  /**
-   * Scans the keys and allows iteration between the given keys.
-   * 
-   * @param from
-   *          the start key
-   * @param fromInclusive
-   *          true if the start key is included in the scan
-   * @param to
-   *          the end key
-   * @param toInclusive
-   *          true if the end key is included in the scan
-   * @return the sorted iterator
-   * @throws IOException
-   */
-  HoplogIterator<K, V> scan(K from, boolean fromInclusive, K to, boolean toInclusive) throws IOException;
-  
-  
-  /**
-   * Scans the available keys and allows iteration over the offset 
-   * specified as parameters
-   * 
-   * 
-   * @param startOffset
-   *          the start offset
-   * @param length
-   *          bytes to read
-   * @return the sorted iterator
-   * @throws IOException
-   */
-  HoplogIterator<K, V> scan(long startOffset, long length) throws IOException;
-
-  /**
-   * Using Cardinality estimator provides an approximate number of entries
-   * 
-   * @return the number of entries
-   */
-  long sizeEstimate();
-
-  /**
-   * Returns true if the reader has been closed.
-   * @return true if closed
-   */
-  boolean isClosed();
-
-  /**
-   * Allows sorted iteration through a set of keys and values.
-   */
-  public interface HoplogIterator<K, V> {
-    K getKey();
-
-    V getValue();
-
-    /** moves to next element and returns the key object */
-    K next() throws IOException;
-    
-    boolean hasNext();
-    
-    void close();
-    
-    void remove();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SequenceFileHoplog.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SequenceFileHoplog.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SequenceFileHoplog.java
deleted file mode 100644
index a2926ff..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SequenceFileHoplog.java
+++ /dev/null
@@ -1,395 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-  
-import java.io.Closeable;
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.EnumMap;
-
-import com.gemstone.gemfire.internal.hll.ICardinality;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
-import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile;
-import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Reader;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-import com.gemstone.gemfire.internal.Version;
-
-import org.apache.logging.log4j.Logger;
-
-/**
- * Implements Sequence file based {@link Hoplog}
- * 
- *
- */
-public class SequenceFileHoplog extends AbstractHoplog{
-  
-   public SequenceFileHoplog(FileSystem inputFS, Path filePath,  
-      SortedOplogStatistics stats)
-  throws IOException
-  {
-     super(inputFS, filePath, stats);
-  }
-  @Override
-  public void close() throws IOException {
-    // Nothing to do 
-  }
-
-  @Override
-  public HoplogReader getReader() throws IOException {
-    return new SequenceFileReader();
-  }
-
-  @Override
-  /**
-   * gets the writer for sequence file. 
-   * 
-   * @param keys is not used for SequenceFileHoplog class 
-   */
-  public HoplogWriter createWriter(int keys) throws IOException {
-    return new SequenceFileHoplogWriter();
-  }
-
-  @Override
-  public boolean isClosed() {
-    return false;
-  }
-  
-  @Override
-  public void close(boolean clearCache) throws IOException {
-    // Nothing to do 
-  }
-
-  /**
-   * Currently, hsync does not update the file size on namenode. So, if last time the 
-   * process died after calling hsync but before calling file close, the file is 
-   * left with an inconsistent file size. This is a workaround that - open the file stream in append 
-   * mode and close it. This fixes the file size on the namenode.
-   * 
-   * @throws IOException
-   * @return true if the file size was fixed 
-   */
-  public boolean fixFileSize() throws IOException {
-    // Try to fix the file size
-    // Loop so that the expected expceptions can be ignored 3
-   // times
-    if (logger.isDebugEnabled())
-      logger.debug("{}Fixing size of hoplog " + path, logPrefix);
-    Exception e = null;
-    boolean exceptionThrown = false;
-    for (int i =0; i < 3; i++) {
-      try {
-        FSDataOutputStream stream = fsProvider.getFS().append(path);
-        stream.close();
-        stream = null;
-      } catch (IOException ie) {
-        exceptionThrown = true;
-        e = ie;
-        if (logger.isDebugEnabled())
-        logger.debug("{}Retry run " + (i + 1) + ": Hoplog " + path + " is still a temporary " +
-            "hoplog because the node managing it wasn't shutdown properly last time. Failed to " +
-            "fix the hoplog because an exception was thrown " + e, logPrefix );
-      }
-      // As either RecoveryInProgressException was thrown or 
-      // Already being created exception was thrown, wait for 
-      // sometime before next retry. 
-      if (exceptionThrown) {
-        try {
-          Thread.sleep(5000);
-        } catch (InterruptedException e1) {
-        } 
-        exceptionThrown = false;
-      } else {
-        // no exception was thrown, break;
-        return true;
-      }
-    }
-    logger.info (logPrefix, LocalizedMessage.create(LocalizedStrings.DEBUG, "Hoplog " + path + " is still a temporary " +
-        "hoplog because the node managing it wasn't shutdown properly last time. Failed to " +
-        "fix the hoplog because an exception was thrown " + e));
-    
-    return false;
-  }
-  
-  @Override
-  public String toString() {
-    return "SequenceFileHplog[" + getFileName() + "]";
-  }
-  
-  private class SequenceFileHoplogWriter implements HoplogWriter {
-    
-    private SequenceFile.Writer writer = null;
-    
-    public SequenceFileHoplogWriter() throws IOException{
-      writer = AbstractHoplog.getSequenceFileWriter(path, conf, logger);
-    }
-   
-    @Override
-    public void close() throws IOException {
-      writer.close();
-      if (logger.isDebugEnabled())
-        logger.debug("{}Completed creating hoplog " + path, logPrefix);
-    }
-    
-    @Override
-    public void hsync() throws IOException {
-      writer.hsyncWithSizeUpdate();
-      if (logger.isDebugEnabled())
-        logger.debug("{}hsync'ed a batch of data to hoplog " + path, logPrefix);
-    }
-    
-    @Override
-    public void append(byte[] key, byte[] value) throws IOException {
-      writer.append(new BytesWritable(key), new BytesWritable(value));
-    }
-
-    @Override
-    public void append(ByteBuffer key, ByteBuffer value) throws IOException {
-      throw new UnsupportedOperationException("Not supported for Sequence files");
-    }
-
-    @Override
-    public void close(EnumMap<Meta, byte[]> metadata) throws IOException {
-      throw new UnsupportedOperationException("Not supported for Sequence files");
-    }
-    @Override
-    public long getCurrentSize() throws IOException {
-      return writer.getLength();
-    }
-    
-  }
-  /**
-   * Sequence file reader. This is currently to be used only by MapReduce jobs and 
-   * test functions
-   * 
-   */
-  public class SequenceFileReader implements HoplogReader, Closeable {
-    @Override
-    public byte[] read(byte[] key) throws IOException {
-      throw new UnsupportedOperationException("Not supported for Sequence files");
-    }
-
-    @Override
-    public HoplogIterator<byte[], byte[]> scan()
-        throws IOException {
-      return  new SequenceFileIterator(fsProvider.getFS(), path, 0, Long.MAX_VALUE, conf, logger);
-    }
-
-    @Override
-    public HoplogIterator<byte[], byte[]> scan(
-        byte[] from, byte[] to) throws IOException {
-      throw new UnsupportedOperationException("Not supported for Sequence files");
-    }
-    
-    @Override
-    public HoplogIterator<byte[], byte[]> scan(
-        long startOffset, long length) throws IOException {
-      return  new SequenceFileIterator(fsProvider.getFS(), path, startOffset, length, conf, logger);
-    }
-    
-    @Override
-    public HoplogIterator<byte[], byte[]> scan(
-        byte[] from, boolean fromInclusive, byte[] to, boolean toInclusive)
-        throws IOException {
-      throw new UnsupportedOperationException("Not supported for Sequence files");
-    }
-
-    @Override
-    public boolean isClosed() {
-      throw new UnsupportedOperationException("Not supported for Sequence files.");
-    }
-    
-    @Override
-    public void close() throws IOException {
-      throw new UnsupportedOperationException("Not supported for Sequence files. Close the iterator instead.");
-    }
-
-    @Override
-    public ByteBuffer get(byte[] key) throws IOException {
-      throw new UnsupportedOperationException("Not supported for Sequence files");
-    }
-
-    @Override
-    public BloomFilter getBloomFilter() throws IOException {
-      throw new UnsupportedOperationException("Not supported for Sequence files");
-    }
-
-    @Override
-    public long getEntryCount() {
-      throw new UnsupportedOperationException("Not supported for Sequence files");
-    }
-
-    @Override
-    public ICardinality getCardinalityEstimator() {
-      throw new UnsupportedOperationException("Not supported for Sequence files");
-    }
-
-    @Override
-    public long sizeEstimate() {
-      throw new UnsupportedOperationException("Not supported for Sequence files");
-    }
-
-
-  }
-  
-  /**
-   * Sequence file iterator. This is currently to be used only by MapReduce jobs and 
-   * test functions
-   * 
-   */
-  public static class SequenceFileIterator implements HoplogIterator<byte[], byte[]> {
-    
-    SequenceFile.Reader reader = null;
-    private BytesWritable prefetchedKey = null;
-    private BytesWritable prefetchedValue = null;
-    private byte[] currentKey;
-    private byte[] currentValue;
-    boolean hasNext = false;
-    Logger logger; 
-    Path path;
-    private long start;
-    private long end;
-    
-    public SequenceFileIterator(FileSystem fs, Path path, long startOffset, 
-        long length, Configuration conf, Logger logger) 
-        throws IOException {
-      Reader.Option optPath = SequenceFile.Reader.file(path);
-      
-      // Hadoop has a configuration parameter io.serializations that is a list of serialization 
-      // classes which can be used for obtaining serializers and deserializers. This parameter 
-      // by default contains avro classes. When a sequence file is created, it calls 
-      // SerializationFactory.getSerializer(keyclass). This internally creates objects using 
-      // reflection of all the classes that were part of io.serializations. But since, there is 
-      // no avro class available it throws an exception. 
-      // Before creating a sequenceFile, override the io.serializations parameter and pass only the classes 
-      // that are important to us. 
-      String serializations[] = conf.getStrings("io.serializations",
-          new String[]{"org.apache.hadoop.io.serializer.WritableSerialization"});
-      conf.setStrings("io.serializations",
-          new String[]{"org.apache.hadoop.io.serializer.WritableSerialization"});
-      // create reader
-      boolean emptyFile = false;
-      try {
-        reader = new SequenceFile.Reader(conf, optPath);
-      }catch (EOFException e) {
-        // this is ok as the file has ended. just return false that no more records available
-        emptyFile = true;
-      }
-      // reset the configuration to its original value 
-      conf.setStrings("io.serializations", serializations);
-      this.logger = logger;
-      this.path = path;
-      
-      if (emptyFile) {
-        hasNext = false;
-      } else {
-        // The file should be read from the first sync marker after the start position and 
-        // until the first sync marker after the end position is seen. 
-        this.end = startOffset + length;
-        if (startOffset > reader.getPosition()) {
-          reader.sync(startOffset);                  // sync to start
-        }
-        this.start = reader.getPosition();
-        this.hasNext = this.start < this.end;
-        if (hasNext)
-          readNext();
-      } 
-    }
-  
-
-    public Version getVersion(){
-      String version = reader.getMetadata().get(new Text(Meta.GEMFIRE_VERSION.name())).toString();
-      return Version.fromOrdinalOrCurrent(Short.parseShort(version)); 
-    }
-    @Override
-    public boolean hasNext() {
-      return hasNext;
-    }
-
-    @Override
-    public byte[] next() {
-      currentKey = prefetchedKey.getBytes();
-      currentValue = prefetchedValue.getBytes();
-      
-      readNext();
-
-      return currentKey;
-    }
-    
-    private void readNext() {
-      try {
-        long pos = reader.getPosition();
-        prefetchedKey = new BytesWritable();
-        prefetchedValue = new BytesWritable();
-        hasNext = reader.next(prefetchedKey, prefetchedValue);
-        // The file should be read from the first sync marker after the start position and 
-        // until the first sync marker after the end position is seen. 
-        if (pos >= end && reader.syncSeen()) {
-          hasNext = false;
-        }
-      } catch (EOFException e) {
-        // this is ok as the file has ended. just return false that no more records available
-        hasNext = false;
-      } 
-      catch (IOException e) {
-        hasNext = false;
-        logger.error(LocalizedMessage.create(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE, path), e);
-        throw new HDFSIOException(
-            LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(path), e);
-      }
-    }
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException("Not supported for Sequence files");
-    }
-
-    @Override
-    public void close() {
-      IOUtils.closeStream(reader);
-    }
-
-    @Override
-    public byte[] getKey() {
-      return currentKey;
-    }
-
-    @Override
-    public byte[] getValue() {
-      return currentValue;
-    }
-    
-    /** Returns true iff the previous call to next passed a sync mark.*/
-    public boolean syncSeen() { return reader.syncSeen(); }
-
-    /** Return the current byte position in the input file. */
-    public synchronized long getPosition() throws IOException {
-      return reader.getPosition();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/AbstractGFRecordReader.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/AbstractGFRecordReader.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/AbstractGFRecordReader.java
deleted file mode 100644
index f5b63cc..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/AbstractGFRecordReader.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapred;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.lib.CombineFileSplit;
-
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
-import com.gemstone.gemfire.cache.hdfs.internal.UnsortedHoplogPersistedEvent;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.GFKey;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HDFSSplitIterator;
-
-public class AbstractGFRecordReader
-    extends
-    com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.AbstractGFRecordReader
-    implements RecordReader<GFKey, PersistedEventImpl> {
-
-  /**
-   * Initializes instance of record reader using file split and job
-   * configuration
-   * 
-   * @param split
-   * @param conf
-   * @throws IOException
-   */
-  public void initialize(CombineFileSplit split, JobConf conf) throws IOException {
-    CombineFileSplit cSplit = (CombineFileSplit) split;
-    Path[] path = cSplit.getPaths();
-    long[] start = cSplit.getStartOffsets();
-    long[] len = cSplit.getLengths();
-
-    FileSystem fs = cSplit.getPath(0).getFileSystem(conf);
-    this.splitIterator = HDFSSplitIterator.newInstance(fs, path, start, len, 0l, 0l);
-  }
-
-  @Override
-  public boolean next(GFKey key, PersistedEventImpl value) throws IOException {
-    /*
-     * if there are more records in the hoplog, iterate to the next record. Set
-     * key object as is. 
-     */
-
-    if (!super.hasNext()) {
-      key.setKey(null);
-      // TODO make value null;
-      return false;
-    }
-
-    super.next();
-
-    key.setKey(super.getKey().getKey());
-    PersistedEventImpl usersValue = super.getValue();
-    value.copy(usersValue);
-    return true;
-  }
-
-  @Override
-  public GFKey createKey() {
-    return new GFKey();
-  }
-
-  @Override
-  public PersistedEventImpl createValue() {
-    if(this.isSequential) {
-      return new UnsortedHoplogPersistedEvent();
-    } else {
-      return new SortedHoplogPersistedEvent();
-    }
-  }
-
-  @Override
-  public long getPos() throws IOException {
-    // there is no efficient way to find the position of key in hoplog file.
-    return 0;
-  }
-
-  @Override
-  public void close() throws IOException {
-    super.close();
-  }
-
-  @Override
-  public float getProgress() throws IOException {
-    return super.getProgressRatio();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFInputFormat.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFInputFormat.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFInputFormat.java
deleted file mode 100644
index 0e0e455..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFInputFormat.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapred;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.lib.CombineFileSplit;
-
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.GFKey;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil.HoplogOptimizedSplitter;
-
-public class GFInputFormat extends
-    com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.GFInputFormat
-    implements InputFormat<GFKey, PersistedEventImpl>, JobConfigurable {
-
-  @Override
-  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    this.conf = job;
-
-    Collection<FileStatus> hoplogs = getHoplogs();
-    return createSplits(job, hoplogs);
-  }
-
-  /**
-   * Creates an input split for every block occupied by hoplogs of the input
-   * regions
-   * 
-   * @param job 
-   * @param hoplogs
-   * @return array of input splits of type file input split
-   * @throws IOException
-   */
-  private InputSplit[] createSplits(JobConf job, Collection<FileStatus> hoplogs)
-      throws IOException {
-    if (hoplogs == null || hoplogs.isEmpty()) {
-      return new InputSplit[0];
-    }
-
-    HoplogOptimizedSplitter splitter = new HoplogOptimizedSplitter(hoplogs);
-    List<org.apache.hadoop.mapreduce.InputSplit> mr2Splits = splitter.getOptimizedSplits(conf);
-    InputSplit[] splits = new InputSplit[mr2Splits.size()];
-    int i = 0;
-    for (org.apache.hadoop.mapreduce.InputSplit inputSplit : mr2Splits) {
-      org.apache.hadoop.mapreduce.lib.input.CombineFileSplit mr2Spit;
-      mr2Spit = (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) inputSplit;
-      
-      CombineFileSplit split = new CombineFileSplit(job, mr2Spit.getPaths(),
-          mr2Spit.getStartOffsets(), mr2Spit.getLengths(),
-          mr2Spit.getLocations());
-      splits[i] = split;
-      i++;
-    }
-
-    return splits;
-  }
-
-  @Override
-  public RecordReader<GFKey, PersistedEventImpl> getRecordReader(
-      InputSplit split, JobConf job, Reporter reporter) throws IOException {
-
-    CombineFileSplit cSplit = (CombineFileSplit) split;
-    AbstractGFRecordReader reader = new AbstractGFRecordReader();
-    reader.initialize(cSplit, job);
-    return reader;
-  }
-
-  @Override
-  public void configure(JobConf job) {
-    this.conf = job;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFOutputFormat.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFOutputFormat.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFOutputFormat.java
deleted file mode 100644
index 1494e9f..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFOutputFormat.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapred;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.Progressable;
-
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.client.ClientCache;
-
-/**
- * Output format for gemfire. The records provided to writers created by this
- * output format are PUT in a live gemfire cluster.
- * 
- */
-public class GFOutputFormat extends
-    com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.GFOutputFormat
-    implements OutputFormat<Object, Object> {
-
-  @Override
-  public RecordWriter<Object, Object> getRecordWriter(
-      FileSystem ignored, JobConf job, String name, Progressable progress)
-      throws IOException {
-    ClientCache cache = getClientCacheInstance(job);
-    return new GFRecordWriter(cache, job);
-  }
-  
-  @Override
-  public void checkOutputSpecs(FileSystem ignored, JobConf job)
-      throws IOException {
-    validateConfiguration(job);
-  }
-
-  public class GFRecordWriter implements RecordWriter<Object, Object> {
-    private ClientCache clientCache;
-    private Region<Object, Object> region;
-
-    public GFRecordWriter(ClientCache cache, Configuration conf) {
-      this.clientCache = cache;
-      region = getRegionInstance(conf, clientCache);
-    }
-    
-    @Override
-    public void write(Object key, Object value) throws IOException {
-      executePut(region, key, value);
-    }
-
-    @Override
-    public void close(Reporter reporter) throws IOException {
-      closeClientCache(clientCache);
-      // TODO update reporter
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/AbstractGFRecordReader.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/AbstractGFRecordReader.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/AbstractGFRecordReader.java
deleted file mode 100644
index 2c71b18..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/AbstractGFRecordReader.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
-import com.gemstone.gemfire.cache.hdfs.internal.UnsortedHoplogPersistedEvent;
-import com.gemstone.gemfire.internal.util.BlobHelper;
-
-public class AbstractGFRecordReader extends
-    RecordReader<GFKey, PersistedEventImpl> {
-
-  // constant overhead of each KV in hfile. This is used in computing the
-  // progress of record reader
-  protected long RECORD_OVERHEAD = 8;
-
-  // accounting for number of bytes already read from the hfile
-  private long bytesRead;
-  
-  protected boolean isSequential;
-  
-  protected HDFSSplitIterator splitIterator;
-
-  @Override
-  public void initialize(InputSplit split, TaskAttemptContext context)
-  throws IOException, InterruptedException {
-    CombineFileSplit cSplit = (CombineFileSplit) split;
-    Path[] path = cSplit.getPaths();
-    long[] start = cSplit.getStartOffsets();
-    long[] len = cSplit.getLengths();
-
-    Configuration conf = context.getConfiguration();
-    FileSystem fs = cSplit.getPath(0).getFileSystem(conf);
-    
-    this.splitIterator = HDFSSplitIterator.newInstance(fs, path, start, len, 0l, 0l);
-  }
-  
-  @Override
-  public boolean nextKeyValue() throws IOException, InterruptedException {
-    return next();
-  }
-
-  protected boolean next() throws IOException {
-    if (!hasNext()) {
-      return false;
-    }
-    
-    splitIterator.next();
-    bytesRead += (splitIterator.getKey().length + splitIterator.getValue().length);
-    bytesRead += RECORD_OVERHEAD;
-    return true;
-  }
-  
-  protected boolean hasNext() throws IOException {
-    return splitIterator.hasNext();
-  }
-
-  @Override
-  public GFKey getCurrentKey() throws IOException, InterruptedException {
-    return getKey();
-  }
-
-  protected GFKey getKey() throws IOException {
-    try {
-      GFKey key = new GFKey();
-      key.setKey(BlobHelper.deserializeBlob(splitIterator.getKey()));
-      return key;
-    } catch (ClassNotFoundException e) {
-      // TODO resolve logging
-      return null;
-    }
-  }
-
-  @Override
-  public PersistedEventImpl getCurrentValue() throws IOException,
-      InterruptedException {
-    return getValue();
-  }
-
-  protected PersistedEventImpl getValue() throws IOException {
-    try {
-      byte[] valueBytes = splitIterator.getValue();
-      if(isSequential) {
-        return UnsortedHoplogPersistedEvent.fromBytes(valueBytes);
-      } else {
-        return SortedHoplogPersistedEvent.fromBytes(valueBytes);
-      }
-    } catch (ClassNotFoundException e) {
-      // TODO resolve logging
-      return null;
-    }
-  }
-
-  @Override
-  public float getProgress() throws IOException, InterruptedException {
-    return getProgressRatio();
-  }
-
-  protected float getProgressRatio() throws IOException {
-    if (!splitIterator.hasNext()) {
-      return 1.0f;
-    } else if (bytesRead > splitIterator.getLength()) {
-      // the record reader is expected to read more number of bytes as it
-      // continues till beginning of next block. hence if extra reading has
-      // started return fixed value
-      return 0.95f;
-    } else {
-      return Math.min(1.0f, bytesRead / (float) (splitIterator.getLength()));
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    splitIterator.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFInputFormat.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFInputFormat.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFInputFormat.java
deleted file mode 100644
index ff64ceb..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFInputFormat.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil.HoplogOptimizedSplitter;
-
-public class GFInputFormat extends InputFormat<GFKey, PersistedEventImpl>
-    implements Configurable {
-  public static final String HOME_DIR = "mapreduce.input.gfinputformat.homedir";
-  public static final String INPUT_REGION = "mapreduce.input.gfinputformat.inputregion";
-  public static final String START_TIME = "mapreduce.input.gfinputformat.starttime";
-  public static final String END_TIME = "mapreduce.input.gfinputformat.endtime";
-  public static final String CHECKPOINT = "mapreduce.input.gfinputformat.checkpoint";
-  
-  protected Configuration conf;
-
-  @Override
-  public List<InputSplit> getSplits(JobContext job) throws IOException {
-    this.conf = job.getConfiguration();
-    
-    Collection<FileStatus> hoplogs = getHoplogs();
-    return createSplits(hoplogs);
-  }
-
-  /**
-   * Identifies filters provided in the job configuration and creates a list of
-   * sorted hoplogs. If there are no sorted hoplogs, checks if the region has
-   * sequential hoplogs
-   * 
-   * @return list of hoplogs
-   * @throws IOException
-   */
-  protected Collection<FileStatus> getHoplogs() throws IOException {
-    String regionName = conf.get(INPUT_REGION);
-    System.out.println("GFInputFormat: Region Name is " + regionName);
-    if (regionName == null || regionName.trim().isEmpty()) {
-      // incomplete job configuration, region name must be provided
-      return new ArrayList<FileStatus>();
-    }
-
-    String home = conf.get(HOME_DIR, HDFSStore.DEFAULT_HOME_DIR);
-    regionName = HdfsRegionManager.getRegionFolder(regionName);
-    Path regionPath = new Path(home + "/" + regionName);
-    FileSystem fs = regionPath.getFileSystem(conf);
-
-    long start = conf.getLong(START_TIME, 0l);
-    long end = conf.getLong(END_TIME, 0l);
-    boolean checkpoint = conf.getBoolean(CHECKPOINT, true);
-
-    // if the region contains flush hoplogs then the region is of type RW.
-    Collection<FileStatus> hoplogs;
-    hoplogs = HoplogUtil.filterHoplogs(fs, regionPath, start, end, checkpoint);
-    return hoplogs == null ? new ArrayList<FileStatus>() : hoplogs;
-  }
-  
-  /**
-   * Creates an input split for every block occupied by hoplogs of the input
-   * regions
-   * 
-   * @param hoplogs
-   * @return list of input splits of type file input split
-   * @throws IOException
-   */
-  private List<InputSplit> createSplits(Collection<FileStatus> hoplogs)
-      throws IOException {
-    List<InputSplit> splits = new ArrayList<InputSplit>();
-    if (hoplogs == null || hoplogs.isEmpty()) {
-      return splits;
-    }
-    
-    HoplogOptimizedSplitter splitter = new HoplogOptimizedSplitter(hoplogs);
-    return splitter.getOptimizedSplits(conf);
-  }
-
-  @Override
-  public RecordReader<GFKey, PersistedEventImpl> createRecordReader(
-      InputSplit split, TaskAttemptContext context) throws IOException,
-      InterruptedException {
-    return new AbstractGFRecordReader();
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKey.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKey.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKey.java
deleted file mode 100644
index 5bba2c7..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKey.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-
-import com.gemstone.gemfire.internal.util.BlobHelper;
-
-public class GFKey implements WritableComparable<GFKey> {
-  private Object key;
-
-  public Object getKey() {
-    return key;
-  }
-
-  public void setKey(Object key) {
-    this.key = key;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    byte[] bytes = BlobHelper.serializeToBlob(key);
-    out.writeInt(bytes.length);
-    out.write(bytes, 0, bytes.length);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    int len = in.readInt();
-    byte[] bytes = new byte[len];
-    in.readFully(bytes, 0, len);
-    try {
-      key = BlobHelper.deserializeBlob(bytes);
-    } catch (ClassNotFoundException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-  }
-
-  @Override
-  public int compareTo(GFKey o) {
-    try {
-      byte[] b1 = BlobHelper.serializeToBlob(key);
-      byte[] b2 = BlobHelper.serializeToBlob(o.key);
-      return WritableComparator.compareBytes(b1, 0, b1.length, b2, 0, b2.length);
-    } catch (IOException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-    
-    return 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFOutputFormat.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFOutputFormat.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFOutputFormat.java
deleted file mode 100644
index 3be2ab0..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFOutputFormat.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.InvalidJobConfException;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.RegionExistsException;
-import com.gemstone.gemfire.cache.client.ClientCache;
-import com.gemstone.gemfire.cache.client.ClientCacheFactory;
-import com.gemstone.gemfire.cache.client.ClientRegionFactory;
-import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
-import com.gemstone.gemfire.cache.server.CacheServer;
-import com.gemstone.gemfire.management.internal.cli.converters.ConnectionEndpointConverter;
-
-/**
- * Output format for gemfire. The records provided to writers created by this
- * output format are PUT in a live gemfire cluster.
- * 
- */
-public class GFOutputFormat extends OutputFormat<Object, Object> {
-  public static final String REGION = "mapreduce.output.gfoutputformat.outputregion";
-  public static final String LOCATOR_HOST = "mapreduce.output.gfoutputformat.locatorhost";
-  public static final String LOCATOR_PORT = "mapreduce.output.gfoutputformat.locatorport";
-  public static final String SERVER_HOST = "mapreduce.output.gfoutputformat.serverhost";
-  public static final String SERVER_PORT = "mapreduce.output.gfoutputformat.serverport";
-
-  @Override
-  public RecordWriter<Object, Object> getRecordWriter(TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    Configuration conf = context.getConfiguration();
-    ClientCache cache = getClientCacheInstance(conf);
-    return new GFRecordWriter(cache, context.getConfiguration());
-  }
-
-  public ClientCache getClientCacheInstance(Configuration conf) {
-    // if locator host is provided create a client cache instance using
-    // connection to locator. If locator is not provided and server host is also
-    // not provided, connect using default locator
-    ClientCache cache;
-    String serverHost = conf.get(SERVER_HOST);
-    if (serverHost == null || serverHost.isEmpty()) {
-      cache = createGFWriterUsingLocator(conf);
-    } else {
-      cache = createGFWriterUsingServer(conf);
-    }
-    return cache;
-  }
-
-  /**
-   * Creates instance of {@link ClientCache} by connecting to GF cluster through
-   * locator
-   */
-  public ClientCache createGFWriterUsingLocator(Configuration conf) {
-    // if locator host is not provided assume localhost
-    String locator = conf.get(LOCATOR_HOST,
-        ConnectionEndpointConverter.DEFAULT_LOCATOR_HOST);
-    // if locator port is not provided assume default locator port 10334
-    int port = conf.getInt(LOCATOR_PORT,
-        ConnectionEndpointConverter.DEFAULT_LOCATOR_PORT);
-
-    // create gemfire client cache instance
-    ClientCacheFactory ccf = new ClientCacheFactory();
-    ccf.addPoolLocator(locator, port);
-    ClientCache cache = ccf.create();
-    return cache;
-  }
-
-  /**
-   * Creates instance of {@link ClientCache} by connecting to GF cluster through
-   * GF server
-   */
-  public ClientCache createGFWriterUsingServer(Configuration conf) {
-    String server = conf.get(SERVER_HOST);
-    // if server port is not provided assume default server port, 40404
-    int port = conf.getInt(SERVER_PORT, CacheServer.DEFAULT_PORT);
-
-    // create gemfire client cache instance
-    ClientCacheFactory ccf = new ClientCacheFactory();
-    ccf.addPoolServer(server, port);
-    ClientCache cache = ccf.create();
-    return cache;
-  }
-
-  public Region<Object, Object> getRegionInstance(Configuration conf,
-      ClientCache cache) {
-    Region<Object, Object> region;
-
-    // create gemfire region in proxy mode
-    String regionName = conf.get(REGION);
-    ClientRegionFactory<Object, Object> regionFactory = cache
-        .createClientRegionFactory(ClientRegionShortcut.PROXY);
-    try {
-      region = regionFactory.create(regionName);
-    } catch (RegionExistsException e) {
-      region = cache.getRegion(regionName);
-    }
-
-    return region;
-  }
-
-  /**
-   * Puts a K-V pair in region
-   * @param region
-   * @param key
-   * @param value
-   */
-  public void executePut(Region<Object, Object> region, Object key, Object value) {
-    region.put(key, value);
-  }
-
-  /**
-   * Closes client cache instance
-   * @param clientCache
-   */
-  public void closeClientCache(ClientCache clientCache) {
-    if (clientCache != null && !clientCache.isClosed()) {
-      clientCache.close();
-    }
-  }
-
-  /**
-   * Validates correctness and completeness of job's output configuration
-   * 
-   * @param conf
-   * @throws InvalidJobConfException
-   */
-  protected void validateConfiguration(Configuration conf)
-      throws InvalidJobConfException {
-    // User must configure the output region name.
-    String region = conf.get(REGION);
-    if (region == null || region.trim().isEmpty()) {
-      throw new InvalidJobConfException("Output Region name not provided.");
-    }
-
-    // TODO validate if a client connected to gemfire cluster can be created
-  }
-  
-  @Override
-  public void checkOutputSpecs(JobContext context) throws IOException,
-      InterruptedException {
-    Configuration conf = context.getConfiguration();
-    validateConfiguration(conf);
-  }
-
-  @Override
-  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
-        context);
-  }
-
-  public class GFRecordWriter extends RecordWriter<Object, Object> {
-    private ClientCache clientCache;
-    private Region<Object, Object> region;
-
-    public GFRecordWriter(ClientCache cache, Configuration conf) {
-      this.clientCache = cache;
-      region = getRegionInstance(conf, clientCache);
-    }
-
-    @Override
-    public void write(Object key, Object value) throws IOException,
-        InterruptedException {
-      executePut(region, key, value);
-    }
-
-    @Override
-    public void close(TaskAttemptContext context) throws IOException,
-        InterruptedException {
-      closeClientCache(clientCache);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIterator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIterator.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIterator.java
deleted file mode 100644
index 869ad0d..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIterator.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplog;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
-import com.gemstone.gemfire.i18n.LogWriterI18n;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-
-/**
- * Iterates over the records in part of a hoplog. This iterator
- * is passed from the map reduce job into the gemfirexd LanguageConnectionContext
- * for gemfirexd to use as the iterator during the map phase.
- *
- */
-public abstract class HDFSSplitIterator {
-  // data object for holding path, offset and length, of all the blocks this
-  // iterator needs to iterate on
-  private CombineFileSplit split;
-
-  // the following members are pointers to current hoplog which is being
-  // iterated upon
-  private int currentHopIndex = 0;
-  private AbstractHoplog hoplog;
-  protected HoplogIterator<byte[], byte[]> iterator;
-  byte[] key;
-  byte[] value;
-  
-  private long bytesRead;
-  protected long RECORD_OVERHEAD = 8;
-
-  private long startTime = 0l;
-  private long endTime = 0l;
-
-  protected FileSystem fs;
-  private static final Logger logger = LogService.getLogger();
-  protected final String logPrefix = "<" + "HDFSSplitIterator" + "> ";
-
-  public HDFSSplitIterator(FileSystem fs, Path[] paths, long[] offsets, long[] lengths, long startTime, long endTime) throws IOException {
-    this.fs = fs;
-    this.split = new CombineFileSplit(paths, offsets, lengths, null);
-    while(currentHopIndex < split.getNumPaths() && !fs.exists(split.getPath(currentHopIndex))){
-      logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_CLEANED_UP_BY_JANITOR, split.getPath(currentHopIndex)));
-      currentHopIndex++;
-    }
-    if(currentHopIndex == split.getNumPaths()){
-      this.hoplog = null;
-      iterator = null;
-    } else {
-      this.hoplog = getHoplog(fs,split.getPath(currentHopIndex));
-      iterator = hoplog.getReader().scan(split.getOffset(currentHopIndex), split.getLength(currentHopIndex));
-    }
-    this.startTime = startTime;
-    this.endTime = endTime;
-  }
-
-  /**
-   * Get the appropriate iterator for the file type.
-   */
-  public static HDFSSplitIterator newInstance(FileSystem fs, Path[] path,
-      long[] start, long[] len, long startTime, long endTime)
-      throws IOException {
-    String fileName = path[0].getName();
-    if (fileName.endsWith(AbstractHoplogOrganizer.SEQ_HOPLOG_EXTENSION)) {
-      return new StreamSplitIterator(fs, path, start, len, startTime, endTime);
-    } else {
-      return new RWSplitIterator(fs, path, start, len, startTime, endTime);
-    }
-  }
-
-  public final boolean hasNext() throws IOException {
-    while (currentHopIndex < split.getNumPaths()) {
-      if (iterator != null) {
-        if(iterator.hasNext()) {
-          return true;
-        } else {
-          iterator.close();
-          iterator = null;
-          hoplog.close();
-          hoplog = null;
-        }
-      }
-      
-      if (iterator == null) {
-        // Iterator is null if this is first read from this iterator or all the
-        // entries from the previous iterator have been read. create iterator on
-        // the next hoplog.
-        currentHopIndex++;
-        while (currentHopIndex < split.getNumPaths() && !fs.exists(split.getPath(currentHopIndex))){
-          logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_CLEANED_UP_BY_JANITOR, split.getPath(currentHopIndex).toString()));
-          currentHopIndex++;
-        }
-        if (currentHopIndex >= split.getNumPaths()) {
-          return false;
-        }
-        hoplog = getHoplog(fs, split.getPath(currentHopIndex));
-        iterator = hoplog.getReader().scan(split.getOffset(currentHopIndex), split.getLength(currentHopIndex));
-      }
-    }
-    
-    return false;
-  } 
-
-  public final boolean next() throws IOException {
-    while (hasNext()) {
-      key = iterator.next();
-      value = iterator.getValue();
-      bytesRead += (key.length + value.length);
-      bytesRead += RECORD_OVERHEAD;
-      
-      // if any filter is set, check if the event's timestamp matches the
-      // filter. The events returned by the iterator may not be time ordered. So
-      // it is important to check filters everytime.
-      if (startTime > 0 || endTime > 0) {
-        try {
-          PersistedEventImpl event = getDeserializedValue();
-          long timestamp = event.getTimstamp();
-          if (startTime > 0l && timestamp < startTime) {
-            continue;
-          }
-          
-          if (endTime > 0l && timestamp > endTime) {
-            continue;
-          }
-        } catch (ClassNotFoundException e) {
-          throw new HDFSIOException("Error reading from HDFS", e);
-        } 
-      }
-        
-      return true;
-    }
-    
-    return false;
-  }
-
-  public final long getBytesRead() {
-    return this.bytesRead;
-  }
-
-  public final byte[] getKey() {
-    return key;
-  }
-
-  public abstract PersistedEventImpl getDeserializedValue()
-      throws ClassNotFoundException, IOException;
-
-  protected abstract AbstractHoplog getHoplog(FileSystem fs, Path path)
-      throws IOException;
-
-  public final byte[] getValue() {
-    return value;
-  }
-
-  public final long getLength() {
-    return split.getLength();
-  }
-
-  public void close() throws IOException {
-    if (iterator != null) {
-      iterator.close();
-      iterator = null;
-    }
-    
-    if (hoplog != null) {
-      hoplog.close();
-      hoplog.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtil.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtil.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtil.java
deleted file mode 100644
index c4c0d1c..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtil.java
+++ /dev/null
@@ -1,463 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer.HoplogComparator;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig;
-
-public class HoplogUtil {
-  /**
-   * @param regionPath
-   *          HDFS path of the region
-   * @param fs
-   *          file system associated with the region
-   * @param type
-   *          type of hoplog to be fetched; flush hoplog or sequence hoplog
-   * @return All hoplog file paths belonging to the region provided
-   * @throws IOException
-   */
-  public static Collection<FileStatus> getAllRegionHoplogs(Path regionPath,
-      FileSystem fs, String type) throws IOException {
-    return getRegionHoplogs(regionPath, fs, type, 0, 0);
-  }
-
-  /**
-   * @param regionPath
-   *          Region path
-   * @param fs
-   *          file system associated with the region
-   * @param type
-   *          type of hoplog to be fetched; flush hoplog or sequence hoplog
-   * @param start
-   *          Exclude files that do not contain records mutated after start time
-   * @param end
-   *          Exclude files that do not contain records mutated before end time
-   * @return All hoplog file paths belonging to the region provided
-   * @throws IOException
-   */
-  public static Collection<FileStatus> getRegionHoplogs(Path regionPath,
-      FileSystem fs, String type, long start, long end) throws IOException {
-    Collection<Collection<FileStatus>> allBuckets = getBucketHoplogs(
-        regionPath, fs, type, start, end);
-
-    ArrayList<FileStatus> hoplogs = new ArrayList<FileStatus>();
-    for (Collection<FileStatus> bucket : allBuckets) {
-      for (FileStatus file : bucket) {
-        hoplogs.add(file);
-      }
-    }
-    return hoplogs;
-  }
-
-  public static Collection<Collection<FileStatus>> getBucketHoplogs(Path regionPath,
-      FileSystem fs, String type, long start, long end) throws IOException {
-    Collection<Collection<FileStatus>> allBuckets = new ArrayList<Collection<FileStatus>>();
-
-    // hoplog files names follow this pattern
-    String HOPLOG_NAME_REGEX = AbstractHoplogOrganizer.HOPLOG_NAME_REGEX + type;
-    String EXPIRED_HOPLOG_NAME_REGEX = HOPLOG_NAME_REGEX + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION;
-    final Pattern pattern = Pattern.compile(HOPLOG_NAME_REGEX);
-    final Pattern expiredPattern = Pattern.compile(EXPIRED_HOPLOG_NAME_REGEX);
-    
-    Path cleanUpIntervalPath = new Path(regionPath.getParent(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME);
-    long intervalDurationMillis = readCleanUpIntervalMillis(fs, cleanUpIntervalPath);
-
-    // a region directory contains directories for individual buckets. A bucket
-    // has a integer name.
-    FileStatus[] bucketDirs = fs.listStatus(regionPath);
-    
-    for (FileStatus bucket : bucketDirs) {
-      if (!bucket.isDirectory()) {
-        continue;
-      }
-      try {
-        Integer.valueOf(bucket.getPath().getName());
-      } catch (NumberFormatException e) {
-        continue;
-      }
-
-      ArrayList<FileStatus> bucketHoplogs = new ArrayList<FileStatus>();
-
-      // identify all the flush hoplogs and seq hoplogs by visiting all the
-      // bucket directories
-      FileStatus[] bucketFiles = fs.listStatus(bucket.getPath());
-      
-      Map<String, Long> expiredHoplogs = getExpiredHoplogs(fs, bucketFiles, expiredPattern);
-      
-      FileStatus oldestHopAfterEndTS = null;
-      long oldestHopTS = Long.MAX_VALUE;
-      long currentTimeStamp = System.currentTimeMillis();
-      for (FileStatus file : bucketFiles) {
-        if (!file.isFile()) {
-          continue;
-        }
-
-        Matcher match = pattern.matcher(file.getPath().getName());
-        if (!match.matches()) {
-          continue;
-        }
-        
-        long timeStamp = AbstractHoplogOrganizer.getHoplogTimestamp(match);
-        if (start > 0 && timeStamp < start) {
-          // this hoplog contains records less than the start time stamp
-          continue;
-        }
-
-        if (end > 0 && timeStamp > end) {
-          // this hoplog contains records mutated after end time stamp. Ignore
-          // this hoplog if it is not the oldest.
-          if (oldestHopTS > timeStamp) {
-            oldestHopTS = timeStamp;
-            oldestHopAfterEndTS = file;
-          }
-          continue;
-        }
-        long expiredTimeStamp = expiredTime(file, expiredHoplogs);
-        if (expiredTimeStamp > 0 && intervalDurationMillis > 0) {
-          if ((currentTimeStamp - expiredTimeStamp) > 0.8 * intervalDurationMillis) {
-            continue;
-          }
-        }
-        bucketHoplogs.add(file);
-      }
-
-      if (oldestHopAfterEndTS != null) {
-        long expiredTimeStamp = expiredTime(oldestHopAfterEndTS, expiredHoplogs);
-        if (expiredTimeStamp <= 0 || intervalDurationMillis <=0  || 
-            (currentTimeStamp - expiredTimeStamp) <= 0.8 * intervalDurationMillis) {
-          bucketHoplogs.add(oldestHopAfterEndTS);
-        }
-      }
-
-      if (bucketHoplogs.size() > 0) {
-        allBuckets.add(bucketHoplogs);
-      }
-    }
-    
-    return allBuckets;
-  }
-  
-  private static Map<String, Long> getExpiredHoplogs(FileSystem fs, FileStatus[] bucketFiles, 
-      Pattern expiredPattern) throws IOException{
-    Map<String, Long> expiredHoplogs = new HashMap<String,Long>();
-    
-    for(FileStatus file : bucketFiles) {
-      if(!file.isFile()) {
-        continue;
-      }
-      String fileName = file.getPath().getName();
-      Matcher match = expiredPattern.matcher(fileName);
-      if (!match.matches()){
-        continue;
-      }
-      expiredHoplogs.put(fileName,file.getModificationTime());
-    }
-    return expiredHoplogs;
-  }
-  
-  private static long expiredTime(FileStatus file, Map<String, Long> expiredHoplogs){
-    String expiredMarkerName = file.getPath().getName() + 
-        AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION;
-    
-    long expiredTimeStamp = -1;
-    if (expiredHoplogs.containsKey(expiredMarkerName)) {
-      expiredTimeStamp = expiredHoplogs.get(expiredMarkerName);
-    }
-    return expiredTimeStamp;
-  }
-  
-  public static long readCleanUpIntervalMillis(FileSystem fs, Path cleanUpIntervalPath) throws IOException{
-    if (fs.exists(cleanUpIntervalPath)) {
-      FSDataInputStream input = new FSDataInputStream(fs.open(cleanUpIntervalPath));
-      long intervalDurationMillis = input.readLong();
-      input.close();
-      return intervalDurationMillis;
-    } else {
-      return -1l;
-    }
-  }
-  
-  public static void exposeCleanupIntervalMillis(FileSystem fs, Path path, long intervalDurationMillis){
-    FSDataInputStream input = null;
-    FSDataOutputStream output = null;
-    try {
-      if(fs.exists(path)){
-        input = new FSDataInputStream(fs.open(path));
-        if (intervalDurationMillis == input.readLong()) {
-          input.close();
-          return;
-        }
-        input.close();
-        fs.delete(path, true);
-      } 
-      output = fs.create(path);
-      output.writeLong(intervalDurationMillis);
-      output.close();
-    } catch (IOException e) {
-      return;
-    } finally {
-      try {
-        if (input != null){
-          input.close();
-        }
-        if (output != null) {
-          output.close();
-        }
-      } catch(IOException e2) {
-        
-      } 
-    }
-  }
-
-  /**
-   * @param regionPath
-   * @param fs
-   * @return list of latest checkpoint files of all buckets in the region
-   * @throws IOException
-   */
-  public static Collection<FileStatus> getCheckpointFiles(Path regionPath,
-      FileSystem fs) throws IOException {
-    ArrayList<FileStatus> latestSnapshots = new ArrayList<FileStatus>();
-
-    Collection<Collection<FileStatus>> allBuckets = getBucketHoplogs(
-        regionPath, fs, AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION, 0, 0);
-
-    // extract the latest major compacted hoplog from each bucket
-    for (Collection<FileStatus> bucket : allBuckets) {
-      FileStatus latestSnapshot = null;
-      for (FileStatus file : bucket) {
-        if (latestSnapshot == null) {
-          latestSnapshot = file;
-        } else {
-          String name1 = latestSnapshot.getPath().getName();
-          String name2 = file.getPath().getName();
-          
-          if (HoplogComparator.compareByName(name1, name2) > 0) {
-            latestSnapshot = file;
-          }
-        }
-      }
-      
-      if (latestSnapshot != null) {
-        latestSnapshots.add(latestSnapshot);
-      }
-    }
-
-    return latestSnapshots;
-  }
-  
-  /**
-   * Creates a mapping of hoplog to hdfs blocks on disk
-   * 
-   * @param files
-   *          list of hoplog file status objects
-   * @return array of hdfs block location objects associated with a hoplog
-   * @throws IOException
-   */
-  public static Map<FileStatus, BlockLocation[]> getBlocks(Configuration config,
-      Collection<FileStatus> files) throws IOException {
-    Map<FileStatus, BlockLocation[]> blocks = new HashMap<FileStatus, BlockLocation[]>();
-    if (files == null || files.isEmpty()) {
-      return blocks;
-    }
-
-    FileSystem fs = files.iterator().next().getPath().getFileSystem(config);
-
-    for (FileStatus hoplog : files) {
-      long length = hoplog.getLen();
-      BlockLocation[] fileBlocks = fs.getFileBlockLocations(hoplog, 0, length);
-      blocks.put(hoplog, fileBlocks);
-    }
-
-    return blocks;
-  }
-  
-  /**
-   * Filters out hoplogs of a region that do not match time filters and creates
-   * a list of hoplogs that may be used by hadoop jobs.
-   * 
-   * @param fs
-   *          file system instance
-   * @param path
-   *          region path
-   * @param start
-   *          start time in milliseconds
-   * @param end
-   *          end time in milliseconds
-   * @param snapshot
-   *          if true latest snapshot hoplog will be included in the final
-   *          return list
-   * @return filtered collection of hoplogs
-   * @throws IOException
-   */
-  public static Collection<FileStatus> filterHoplogs(FileSystem fs, Path path,
-      long start, long end, boolean snapshot) throws IOException {
-    ArrayList<FileStatus> hoplogs = new ArrayList<FileStatus>();
-
-    // if the region contains flush hoplogs or major compacted files then the
-    // region is of type RW.
-    // check if the intent is to operate on major compacted files only
-    if (snapshot) {
-      hoplogs.addAll(getCheckpointFiles(path, fs));
-    } else {
-      hoplogs.addAll(getRegionHoplogs(path, fs,
-          AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, start, end));
-    }
-
-    if (hoplogs == null || hoplogs.isEmpty()) {
-      // there are no sorted hoplogs. Check if sequence hoplogs are present
-      // there is no checkpoint mode for write only tables
-      hoplogs.addAll(getRegionHoplogs(path, fs,
-          AbstractHoplogOrganizer.SEQ_HOPLOG_EXTENSION, start, end));
-    }
-
-    return hoplogs == null ? new ArrayList<FileStatus>() : hoplogs;
-  }
-  
-  private HoplogUtil() {
-    //static methods only.
-  }
-  
-  /**
-   * This class creates MR splits from hoplog files. This class leverages
-   * CombineFileInputFormat to create locality, node and rack, aware splits
-   * 
-   */
-  public static class HoplogOptimizedSplitter extends CombineFileInputFormat<Long, Long> {
-    private Collection<FileStatus> hoplogs;
-
-    public HoplogOptimizedSplitter(Collection<FileStatus> hoplogs) {
-      this.hoplogs = hoplogs;
-    }
-    
-    @Override
-    protected List<FileStatus> listStatus(JobContext job) throws IOException {
-      /**
-       * listStatus in super collects fileStatus for each file again. It also
-       * tries to recursively list files in subdirectories. None of this is
-       * applicable in this case. Splitter has already collected fileStatus for
-       * all files. So bypassing super's method will improve performance as NN
-       * chatter will be reduced. Specially helpful if NN is not colocated.
-       */
-      return new ArrayList<FileStatus>(hoplogs);
-    }
-    
-    /**
-     * Creates an array of splits for the input list of hoplogs. Each split is
-     * roughly the size of an hdfs block. Hdfs blocks of a hoplog may be smaller
-     * than hdfs block size, for e.g. if the hoplog is very small. The method
-     * keeps adding hdfs blocks of a hoplog to a split till the split is less
-     * than hdfs block size and the block is local to the split.
-     */
-    public List<InputSplit> getOptimizedSplits(Configuration conf) throws IOException {
-      
-      if (hoplogs == null || hoplogs.isEmpty()) {
-        return null;
-      }
-      Path[] paths = new Path[hoplogs.size()];
-      int i = 0;
-      for (FileStatus file : hoplogs) {
-        paths[i] = file.getPath();
-        i++;
-      }
-
-      FileStatus hoplog = hoplogs.iterator().next();
-      long blockSize = hoplog.getBlockSize();
-      setMaxSplitSize(blockSize);
-
-      Job job = Job.getInstance(conf);
-      setInputPaths(job, paths);
-      List<InputSplit> splits = super.getSplits(job);
-      
-      // in some cases a split may not get populated with host location
-      // information. If such a split is created, fill location information of
-      // the first file in the split
-      ArrayList<CombineFileSplit> newSplits = new ArrayList<CombineFileSplit>();
-      for (Iterator<InputSplit> iter = splits.iterator(); iter.hasNext();) {
-        CombineFileSplit split = (CombineFileSplit) iter.next();
-        if (split.getLocations() != null && split.getLocations().length > 0) {
-          continue;
-        }
-        
-        paths = split.getPaths();
-        if (paths.length == 0) {
-          continue;
-        }
-        long[] starts = split.getStartOffsets();
-        long[] ends = split.getLengths();
-        
-        FileSystem fs = paths[0].getFileSystem(conf);
-        FileStatus file = fs.getFileStatus(paths[0]);
-        BlockLocation[] blks = fs.getFileBlockLocations(file, starts[0], ends[0]);
-        if (blks != null && blks.length > 0) {
-          // hosts found. Need to create a new split and replace the one missing
-          // hosts.
-          iter.remove();
-          String hosts[] = blks[0].getHosts();
-          split = new CombineFileSplit(paths, starts, ends, hosts);
-          newSplits.add(split);
-        }
-      }
-      splits.addAll(newSplits);
-      
-      return splits;
-    }
-    
-    @Override
-    public List<InputSplit> getSplits(JobContext job) throws IOException {
-      // a call to this method is invalid. This class is only meant to create
-      // optimized splits independent of the api type
-      throw new IllegalStateException();
-    }
-
-    @Override
-    public RecordReader<Long, Long> createRecordReader(InputSplit split,
-        TaskAttemptContext arg1) throws IOException {
-      // Record reader creation is managed by GFInputFormat. This method should
-      // not be called
-      throw new IllegalStateException();
-    }
-  }
-}