You are viewing a plain text version of this content. The canonical link for it is here.
Posted to blur-commits@incubator.apache.org by am...@apache.org on 2016/08/30 01:57:53 UTC

[08/13] git commit: Adding blur indexer project.

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataWithIndexLookup.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataWithIndexLookup.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataWithIndexLookup.java
deleted file mode 100644
index 0e2fe66..0000000
--- a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataWithIndexLookup.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.update;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.blur.BlurConfiguration;
-import org.apache.blur.manager.BlurPartitioner;
-import org.apache.blur.manager.writer.SnapshotIndexDeletionPolicy;
-import org.apache.blur.mapreduce.lib.BlurOutputFormat;
-import org.apache.blur.mapreduce.lib.BlurRecord;
-import org.apache.blur.store.BlockCacheDirectoryFactoryV2;
-import org.apache.blur.store.hdfs.HdfsDirectory;
-import org.apache.blur.thrift.generated.Column;
-import org.apache.blur.thrift.generated.FetchRecordResult;
-import org.apache.blur.thrift.generated.Record;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.blur.utils.RowDocumentUtil;
-import org.apache.blur.utils.ShardUtil;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.AtomicReader;
-import org.apache.lucene.index.AtomicReaderContext;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexCommit;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.Collector;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Scorer;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.store.Directory;
-
-import com.google.common.io.Closer;
-
-public class MapperForExistingDataWithIndexLookup extends Mapper<Text, BooleanWritable, IndexKey, IndexValue> {
-
-  private static final Log LOG = LogFactory.getLog(MapperForExistingDataWithIndexLookup.class);
-
-  private static final String BLUR_SNAPSHOT = "blur.snapshot";
-  private Counter _existingRecords;
-  private Counter _rowLookup;
-  private BlurPartitioner _blurPartitioner;
-  private Path _tablePath;
-  private int _numberOfShardsInTable;
-  private Configuration _configuration;
-  private String _snapshot;
-
-  private int _indexShard = -1;
-  private DirectoryReader _reader;
-  private IndexSearcher _indexSearcher;
-  private long _totalNumberOfBytes;
-  private Closer _closer;
-
-  @Override
-  protected void setup(Context context) throws IOException, InterruptedException {
-    Counter counter = context.getCounter(BlurIndexCounter.LOOKUP_MAPPER);
-    counter.increment(1);
-
-    _configuration = context.getConfiguration();
-    _existingRecords = context.getCounter(BlurIndexCounter.LOOKUP_MAPPER_EXISTING_RECORDS);
-    _rowLookup = context.getCounter(BlurIndexCounter.LOOKUP_MAPPER_ROW_LOOKUP_ATTEMPT);
-    _blurPartitioner = new BlurPartitioner();
-    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration);
-    _numberOfShardsInTable = tableDescriptor.getShardCount();
-    _tablePath = new Path(tableDescriptor.getTableUri());
-    _snapshot = getSnapshot(_configuration);
-    _totalNumberOfBytes = _configuration.getLong(LookupBuilderReducer.BLUR_CACHE_DIR_TOTAL_BYTES, 128 * 1024 * 1024);
-    _closer = Closer.create();
-  }
-
-  @Override
-  protected void map(Text key, BooleanWritable value, Context context) throws IOException, InterruptedException {
-    if (value.get()) {
-      String rowId = key.toString();
-      LOG.debug("Looking up rowid [" + rowId + "]");
-      _rowLookup.increment(1);
-      IndexSearcher indexSearcher = getIndexSearcher(rowId);
-      Term term = new Term(BlurConstants.ROW_ID, rowId);
-      RowCollector collector = getCollector(context);
-      indexSearcher.search(new TermQuery(term), collector);
-      LOG.debug("Looking for rowid [" + rowId + "] has [" + collector.records + "] records");
-    }
-  }
-
-  @Override
-  protected void cleanup(Mapper<Text, BooleanWritable, IndexKey, IndexValue>.Context context) throws IOException,
-      InterruptedException {
-    _closer.close();
-  }
-
-  static class RowCollector extends Collector {
-
-    private AtomicReader reader;
-    private Mapper<Text, BooleanWritable, IndexKey, IndexValue>.Context _context;
-    private Counter _existingRecords;
-    int records;
-
-    RowCollector(Mapper<Text, BooleanWritable, IndexKey, IndexValue>.Context context, Counter existingRecords) {
-      _context = context;
-      _existingRecords = existingRecords;
-    }
-
-    @Override
-    public void setScorer(Scorer scorer) throws IOException {
-
-    }
-
-    @Override
-    public void setNextReader(AtomicReaderContext context) throws IOException {
-      reader = context.reader();
-    }
-
-    @Override
-    public void collect(int doc) throws IOException {
-      Document document = reader.document(doc);
-      FetchRecordResult result = RowDocumentUtil.getRecord(document);
-      String rowid = result.getRowid();
-      Record record = result.getRecord();
-      String recordId = record.getRecordId();
-      IndexKey oldDataKey = IndexKey.oldData(rowid, recordId);
-      try {
-        _context.write(oldDataKey, new IndexValue(toBlurRecord(rowid, record)));
-      } catch (InterruptedException e) {
-        throw new IOException(e);
-      }
-      _existingRecords.increment(1L);
-    }
-
-    private BlurRecord toBlurRecord(String rowId, Record record) {
-      BlurRecord blurRecord = new BlurRecord();
-      blurRecord.setRowId(rowId);
-      blurRecord.setRecordId(record.getRecordId());
-      blurRecord.setFamily(record.getFamily());
-      List<Column> columns = record.getColumns();
-      for (Column column : columns) {
-        blurRecord.addColumn(column.getName(), column.getValue());
-      }
-      return blurRecord;
-    }
-
-    @Override
-    public boolean acceptsDocsOutOfOrder() {
-      return false;
-    }
-  }
-
-  private RowCollector getCollector(Mapper<Text, BooleanWritable, IndexKey, IndexValue>.Context context) {
-    return new RowCollector(context, _existingRecords);
-  }
-
-  private IndexSearcher getIndexSearcher(String rowId) throws IOException {
-    int shard = _blurPartitioner.getShard(rowId, _numberOfShardsInTable);
-    if (_indexSearcher != null) {
-      if (shard != _indexShard) {
-        throw new IOException("Input data is not partitioned correctly.");
-      }
-      return _indexSearcher;
-    } else {
-      _indexShard = shard;
-      Path shardPath = new Path(_tablePath, ShardUtil.getShardName(_indexShard));
-      HdfsDirectory hdfsDirectory = new HdfsDirectory(_configuration, shardPath);
-      SnapshotIndexDeletionPolicy policy = new SnapshotIndexDeletionPolicy(_configuration,
-          SnapshotIndexDeletionPolicy.getGenerationsPath(shardPath));
-      Long generation = policy.getGeneration(_snapshot);
-      if (generation == null) {
-        hdfsDirectory.close();
-        throw new IOException("Snapshot [" + _snapshot + "] not found in shard [" + shardPath + "]");
-      }
-
-      BlurConfiguration bc = new BlurConfiguration();
-      BlockCacheDirectoryFactoryV2 blockCacheDirectoryFactoryV2 = new BlockCacheDirectoryFactoryV2(bc,
-          _totalNumberOfBytes);
-      _closer.register(blockCacheDirectoryFactoryV2);
-      Directory dir = blockCacheDirectoryFactoryV2.newDirectory("table", "shard", hdfsDirectory, null);
-
-      List<IndexCommit> listCommits = DirectoryReader.listCommits(dir);
-      IndexCommit indexCommit = findIndexCommit(listCommits, generation, shardPath);
-      _reader = DirectoryReader.open(indexCommit);
-      return _indexSearcher = new IndexSearcher(_reader);
-    }
-  }
-
-  public static IndexCommit findIndexCommit(List<IndexCommit> listCommits, long generation, Path shardDir)
-      throws IOException {
-    for (IndexCommit commit : listCommits) {
-      if (commit.getGeneration() == generation) {
-        return commit;
-      }
-    }
-    throw new IOException("Generation [" + generation + "] not found in shard [" + shardDir + "]");
-  }
-
-  public static void setSnapshot(Job job, String snapshot) {
-    setSnapshot(job.getConfiguration(), snapshot);
-  }
-
-  public static void setSnapshot(Configuration configuration, String snapshot) {
-    configuration.set(BLUR_SNAPSHOT, snapshot);
-  }
-
-  public static String getSnapshot(Configuration configuration) {
-    return configuration.get(BLUR_SNAPSHOT);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewDataMod.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewDataMod.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewDataMod.java
deleted file mode 100644
index d91d1f5..0000000
--- a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewDataMod.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.update;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-
-import org.apache.blur.mapreduce.lib.BlurRecord;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-public class MapperForNewDataMod extends Mapper<Text, BlurRecord, IndexKey, IndexValue> {
-
-  private static final IndexValue EMPTY_RECORD = new IndexValue();
-  private long _timestamp;
-  private Counter _newRecords;
-
-  @Override
-  protected void setup(Context context) throws IOException, InterruptedException {
-    InputSplit inputSplit = context.getInputSplit();
-    FileSplit fileSplit = getFileSplit(inputSplit);
-    Path path = fileSplit.getPath();
-    FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
-    FileStatus fileStatus = fileSystem.getFileStatus(path);
-    _timestamp = fileStatus.getModificationTime();
-    _newRecords = context.getCounter(BlurIndexCounter.NEW_RECORDS);
-  }
-
-  private FileSplit getFileSplit(InputSplit inputSplit) throws IOException {
-    if (inputSplit instanceof FileSplit) {
-      return (FileSplit) inputSplit;
-    }
-    if (inputSplit.getClass().getName().equals("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) {
-      try {
-        Field declaredField = inputSplit.getClass().getDeclaredField("inputSplit");
-        declaredField.setAccessible(true);
-        return getFileSplit((InputSplit) declaredField.get(inputSplit));
-      } catch (NoSuchFieldException e) {
-        throw new IOException(e);
-      } catch (SecurityException e) {
-        throw new IOException(e);
-      } catch (IllegalArgumentException e) {
-        throw new IOException(e);
-      } catch (IllegalAccessException e) {
-        throw new IOException(e);
-      }
-    } else {
-      throw new IOException("Unknown input split type [" + inputSplit + "] [" + inputSplit.getClass() + "]");
-    }
-  }
-
-  @Override
-  protected void map(Text key, BlurRecord blurRecord, Context context) throws IOException, InterruptedException {
-    IndexKey newDataKey = IndexKey.newData(blurRecord.getRowId(), blurRecord.getRecordId(), _timestamp);
-    context.write(newDataKey, new IndexValue(blurRecord));
-    _newRecords.increment(1L);
-
-    IndexKey newDataMarker = IndexKey.newDataMarker(blurRecord.getRowId());
-    context.write(newDataMarker, EMPTY_RECORD);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MergeSortRowIdMatcher.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MergeSortRowIdMatcher.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MergeSortRowIdMatcher.java
deleted file mode 100644
index bd8580e..0000000
--- a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MergeSortRowIdMatcher.java
+++ /dev/null
@@ -1,388 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.update;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.blur.index.AtomicReaderUtil;
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.store.hdfs.DirectoryDecorator;
-import org.apache.blur.store.hdfs.HdfsDirectory;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.HdfsBlockLocation;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DeflateCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.io.compress.zlib.ZlibFactory;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Progressable;
-import org.apache.lucene.index.AtomicReaderContext;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexCommit;
-import org.apache.lucene.index.SegmentInfo;
-import org.apache.lucene.index.SegmentInfoPerCommit;
-import org.apache.lucene.index.SegmentInfos;
-import org.apache.lucene.index.SegmentReader;
-import org.apache.lucene.index.Terms;
-import org.apache.lucene.index.TermsEnum;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.util.BytesRef;
-
-public class MergeSortRowIdMatcher {
-
-  private static final String DEL = ".del";
-  private static final Log LOG = LogFactory.getLog(MergeSortRowIdMatcher.class);
-  private static final Progressable NO_OP = new Progressable() {
-    @Override
-    public void progress() {
-
-    }
-  };
-  private static final long _10_SECONDS = TimeUnit.SECONDS.toNanos(10);
-
-  public interface Action {
-    void found(Text rowId) throws IOException;
-  }
-
-  private final MyReader[] _readers;
-  private final Configuration _configuration;
-  private final Path _cachePath;
-  private final IndexCommit _indexCommit;
-  private final Directory _directory;
-  private final Progressable _progressable;
-
-  private DirectoryReader _reader;
-
-  public MergeSortRowIdMatcher(Directory directory, long generation, Configuration configuration, Path cachePath)
-      throws IOException {
-    this(directory, generation, configuration, cachePath, null);
-  }
-
-  public MergeSortRowIdMatcher(Directory directory, long generation, Configuration configuration, Path cachePath,
-      Progressable progressable) throws IOException {
-    List<IndexCommit> listCommits = DirectoryReader.listCommits(directory);
-    _indexCommit = findIndexCommit(listCommits, generation);
-    _configuration = configuration;
-    _cachePath = cachePath;
-    _directory = directory;
-    _progressable = progressable == null ? NO_OP : progressable;
-    _readers = openReaders();
-  }
-
-  public void lookup(Text rowId, Action action) throws IOException {
-    if (lookup(rowId)) {
-      action.found(rowId);
-    }
-  }
-
-  private boolean lookup(Text rowId) throws IOException {
-    advanceReadersIfNeeded(rowId);
-    sortReaders();
-    return checkReaders(rowId);
-  }
-
-  private boolean checkReaders(Text rowId) {
-    for (MyReader reader : _readers) {
-      int compareTo = reader.getCurrentRowId().compareTo(rowId);
-      if (compareTo == 0) {
-        return true;
-      } else if (compareTo > 0) {
-        return false;
-      }
-    }
-    return false;
-  }
-
-  private void advanceReadersIfNeeded(Text rowId) throws IOException {
-    _progressable.progress();
-    for (MyReader reader : _readers) {
-      if (rowId.compareTo(reader.getCurrentRowId()) > 0) {
-        advanceReader(reader, rowId);
-      }
-    }
-  }
-
-  private void advanceReader(MyReader reader, Text rowId) throws IOException {
-    while (reader.next()) {
-      if (rowId.compareTo(reader.getCurrentRowId()) <= 0) {
-        return;
-      }
-    }
-  }
-
-  private static final Comparator<MyReader> COMP = new Comparator<MyReader>() {
-    @Override
-    public int compare(MyReader o1, MyReader o2) {
-      return o1.getCurrentRowId().compareTo(o2.getCurrentRowId());
-    }
-  };
-
-  private void sortReaders() {
-    Arrays.sort(_readers, COMP);
-  }
-
-  private MyReader[] openReaders() throws IOException {
-    Collection<SegmentKey> segmentKeys = getSegmentKeys();
-    MyReader[] readers = new MyReader[segmentKeys.size()];
-    int i = 0;
-    for (SegmentKey segmentKey : segmentKeys) {
-      readers[i++] = openReader(segmentKey);
-    }
-    return readers;
-  }
-
-  private MyReader openReader(SegmentKey segmentKey) throws IOException {
-    Path file = getCacheFilePath(segmentKey);
-    FileSystem fileSystem = _cachePath.getFileSystem(_configuration);
-    if (!fileSystem.exists(file)) {
-      createCacheFile(file, segmentKey);
-    }
-    Reader reader = new SequenceFile.Reader(_configuration, SequenceFile.Reader.file(file));
-    return new MyReader(reader);
-  }
-
-  private void createCacheFile(Path file, SegmentKey segmentKey) throws IOException {
-    LOG.info("Building cache for segment [{0}] to [{1}]", segmentKey, file);
-    Path tmpPath = getTmpWriterPath(file.getParent());
-    try (Writer writer = createWriter(_configuration, tmpPath)) {
-      DirectoryReader reader = getReader();
-      for (AtomicReaderContext context : reader.leaves()) {
-        SegmentReader segmentReader = AtomicReaderUtil.getSegmentReader(context.reader());
-        if (segmentReader.getSegmentName().equals(segmentKey.getSegmentName())) {
-          writeRowIds(writer, segmentReader);
-          break;
-        }
-      }
-    }
-    commitWriter(_configuration, file, tmpPath);
-  }
-
-  public static void commitWriter(Configuration configuration, Path file, Path tmpPath) throws IOException {
-    FileSystem fileSystem = tmpPath.getFileSystem(configuration);
-    LOG.info("Commit tmp [{0}] to file [{1}]", tmpPath, file);
-    if (!fileSystem.rename(tmpPath, file)) {
-      LOG.warn("Could not commit tmp file [{0}] to file [{1}]", tmpPath, file);
-    }
-  }
-
-  public static Path getTmpWriterPath(Path dir) {
-    return new Path(dir, UUID.randomUUID().toString() + ".tmp");
-  }
-
-  public static Writer createWriter(Configuration configuration, Path tmpPath) throws IOException {
-    return SequenceFile.createWriter(configuration, SequenceFile.Writer.file(tmpPath),
-        SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(NullWritable.class),
-        SequenceFile.Writer.compression(CompressionType.BLOCK, getCodec(configuration)));
-  }
-
-  private static CompressionCodec getCodec(Configuration configuration) {
-    if (ZlibFactory.isNativeZlibLoaded(configuration)) {
-      return new GzipCodec();
-    }
-    return new DeflateCodec();
-  }
-
-  private void writeRowIds(Writer writer, SegmentReader segmentReader) throws IOException {
-    Terms terms = segmentReader.terms(BlurConstants.ROW_ID);
-    if (terms == null) {
-      return;
-    }
-    TermsEnum termsEnum = terms.iterator(null);
-    BytesRef rowId;
-    long s = System.nanoTime();
-    while ((rowId = termsEnum.next()) != null) {
-      long n = System.nanoTime();
-      if (n + _10_SECONDS > s) {
-        _progressable.progress();
-        s = System.nanoTime();
-      }
-      writer.append(new Text(rowId.utf8ToString()), NullWritable.get());
-    }
-  }
-
-  private IndexCommit findIndexCommit(List<IndexCommit> listCommits, long generation) throws IOException {
-    for (IndexCommit commit : listCommits) {
-      if (commit.getGeneration() == generation) {
-        return commit;
-      }
-    }
-    throw new IOException("Generation [" + generation + "] not found.");
-  }
-
-  static class SegmentKey {
-
-    final String _segmentName;
-    final String _id;
-
-    SegmentKey(String segmentName, String id) throws IOException {
-      _segmentName = segmentName;
-      _id = id;
-    }
-
-    String getSegmentName() {
-      return _segmentName;
-    }
-
-    @Override
-    public String toString() {
-      return _id;
-    }
-  }
-
-  private DirectoryReader getReader() throws IOException {
-    if (_reader == null) {
-      _reader = DirectoryReader.open(_indexCommit);
-    }
-    return _reader;
-  }
-
-  private Collection<SegmentKey> getSegmentKeys() throws IOException {
-    List<SegmentKey> keys = new ArrayList<SegmentKey>();
-    SegmentInfos segmentInfos = new SegmentInfos();
-    segmentInfos.read(_directory, _indexCommit.getSegmentsFileName());
-    for (SegmentInfoPerCommit segmentInfoPerCommit : segmentInfos) {
-      String name = segmentInfoPerCommit.info.name;
-      String id = getId(segmentInfoPerCommit.info);
-      keys.add(new SegmentKey(name, id));
-    }
-    return keys;
-  }
-
-  private String getId(SegmentInfo si) throws IOException {
-    HdfsDirectory dir = getHdfsDirectory(si.dir);
-    Set<String> files = new TreeSet<String>(si.files());
-    return getId(_configuration, dir, files);
-  }
-
-  private static String getId(Configuration configuration, HdfsDirectory dir, Set<String> files) throws IOException {
-    long ts = 0;
-    String file = null;
-    for (String f : files) {
-      if (f.endsWith(DEL)) {
-        continue;
-      }
-      long fileModified = dir.getFileModified(f);
-      if (fileModified > ts) {
-        ts = fileModified;
-        file = f;
-      }
-    }
-
-    Path path = dir.getPath();
-    FileSystem fileSystem = path.getFileSystem(configuration);
-    Path realFile = new Path(path, file);
-    if (!fileSystem.exists(realFile)) {
-      realFile = dir.getRealFilePathFromSymlink(file);
-      if (!fileSystem.exists(realFile)) {
-        throw new IOException("Lucene file [" + file + "] for dir [" + path + "] can not be found.");
-      }
-    }
-    return getFirstBlockId(fileSystem, realFile);
-  }
-
-  public static String getIdForSingleSegmentIndex(Configuration configuration, Path indexPath) throws IOException {
-    HdfsDirectory dir = new HdfsDirectory(configuration, indexPath);
-    Set<String> files = new TreeSet<String>(Arrays.asList(dir.listAll()));
-    return getId(configuration, dir, files);
-  }
-
-  private static String getFirstBlockId(FileSystem fileSystem, Path realFile) throws IOException {
-    FileStatus fileStatus = fileSystem.getFileStatus(realFile);
-    BlockLocation[] locations = fileSystem.getFileBlockLocations(fileStatus, 0, 1);
-    HdfsBlockLocation location = (HdfsBlockLocation) locations[0];
-    LocatedBlock locatedBlock = location.getLocatedBlock();
-    ExtendedBlock block = locatedBlock.getBlock();
-    return toNiceString(block.getBlockId());
-  }
-
-  private static String toNiceString(long blockId) {
-    return "b" + blockId;
-  }
-
-  private static HdfsDirectory getHdfsDirectory(Directory dir) {
-    if (dir instanceof HdfsDirectory) {
-      return (HdfsDirectory) dir;
-    } else if (dir instanceof DirectoryDecorator) {
-      DirectoryDecorator dd = (DirectoryDecorator) dir;
-      return getHdfsDirectory(dd.getOriginalDirectory());
-    } else {
-      throw new RuntimeException("Unknown directory type.");
-    }
-  }
-
-  private Path getCacheFilePath(SegmentKey segmentKey) {
-    return new Path(_cachePath, segmentKey + ".seq");
-  }
-
-  static class MyReader {
-
-    final Reader _reader;
-    final Text _rowId = new Text();
-    boolean _finished = false;
-
-    public MyReader(Reader reader) {
-      _reader = reader;
-    }
-
-    public Text getCurrentRowId() {
-      return _rowId;
-    }
-
-    public boolean next() throws IOException {
-      if (_finished) {
-        return false;
-      }
-      if (_reader.next(_rowId)) {
-        return true;
-      }
-      _finished = true;
-      return false;
-    }
-
-    public boolean isFinished() {
-      return _finished;
-    }
-  }
-
-  public static Path getCachePath(Path cachePath, String table, String shardName) {
-    return new Path(new Path(cachePath, table), shardName);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/PrunedBlurInputFormat.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/PrunedBlurInputFormat.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/PrunedBlurInputFormat.java
deleted file mode 100644
index 8738c5a..0000000
--- a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/PrunedBlurInputFormat.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.update;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.blur.mapreduce.lib.BlurInputFormat;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-
-public class PrunedBlurInputFormat extends BlurInputFormat {
-
-  private static final Log LOG = LogFactory.getLog(PrunedBlurInputFormat.class);
-
-  @Override
-  public List<InputSplit> getSplits(JobContext context) throws IOException {
-    Path[] dirs = getInputPaths(context);
-    Configuration configuration = context.getConfiguration();
-    List<BlurInputSplit> splits = getSplits(configuration, dirs);
-    Map<Path, List<BlurInputSplit>> splitMap = new TreeMap<Path, List<BlurInputSplit>>();
-    for (BlurInputSplit split : splits) {
-      Path path = split.getDir();
-      String table = split.getTable().toString();
-      int shard = InputSplitPruneUtil.getShardFromDirectoryPath(path);
-      long rowIdUpdateFromNewDataCount = InputSplitPruneUtil.getBlurLookupRowIdUpdateFromNewDataCount(configuration,
-          table, shard);
-      long indexCount = InputSplitPruneUtil.getBlurLookupRowIdFromIndexCount(configuration, table, shard);
-      if (rowIdUpdateFromNewDataCount == 0 || indexCount == 0) {
-        LOG.info("Pruning id lookup input path [" + path + "] no overlapping ids.");
-      } else if (InputSplitPruneUtil.shouldLookupExecuteOnShard(configuration, table, shard)) {
-        LOG.info("Pruning blur input path [" + split.getDir() + "]");
-      } else {
-        LOG.debug("Keeping blur input path [" + split.getDir() + "]");
-        List<BlurInputSplit> list = splitMap.get(path);
-        if (list == null) {
-          splitMap.put(path, list = new ArrayList<BlurInputSplit>());
-        }
-        list.add(split);
-      }
-    }
-    List<InputSplit> result = new ArrayList<InputSplit>();
-    for (List<BlurInputSplit> lst : splitMap.values()) {
-      BlurInputSplitColletion blurInputSplitColletion = new BlurInputSplitColletion();
-      for (BlurInputSplit blurInputSplit : lst) {
-        blurInputSplitColletion.add(blurInputSplit);
-      }
-      result.add(blurInputSplitColletion);
-    }
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/PrunedSequenceFileInputFormat.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/PrunedSequenceFileInputFormat.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/PrunedSequenceFileInputFormat.java
deleted file mode 100644
index 58e9800..0000000
--- a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/PrunedSequenceFileInputFormat.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.update;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-
-import com.google.common.base.Splitter;
-
-public class PrunedSequenceFileInputFormat<K, V> extends SequenceFileInputFormat<K, V> {
-
-  private static final Log LOG = LogFactory.getLog(PrunedSequenceFileInputFormat.class);
-
-  @Override
-  public List<InputSplit> getSplits(JobContext job) throws IOException {
-    List<InputSplit> splits = super.getSplits(job);
-    List<InputSplit> results = new ArrayList<InputSplit>();
-    Configuration configuration = job.getConfiguration();
-    String table = InputSplitPruneUtil.getTable(configuration);
-    for (InputSplit inputSplit : splits) {
-      FileSplit fileSplit = (FileSplit) inputSplit;
-      Path path = fileSplit.getPath();
-      LOG.debug("Getting shard index from path [" + path + "]");
-      String name = path.getName();
-      int shard = getShardIndex(name);
-      long rowIdUpdateFromNewDataCount = InputSplitPruneUtil.getBlurLookupRowIdUpdateFromNewDataCount(configuration,
-          table, shard);
-      long indexCount = InputSplitPruneUtil.getBlurLookupRowIdFromIndexCount(configuration, table, shard);
-      if (rowIdUpdateFromNewDataCount == 0 || indexCount == 0) {
-        LOG.info("Pruning id lookup input path [" + path + "] no overlapping ids.");
-      } else if (InputSplitPruneUtil.shouldLookupExecuteOnShard(configuration, table, shard)) {
-        LOG.debug("Keeping id lookup input path [" + path + "]");
-        results.add(inputSplit);
-      } else {
-        LOG.info("Pruning id lookup input path [" + path + "]");
-      }
-    }
-    return results;
-  }
-
-  private int getShardIndex(String name) {
-    // based on file format of "part-r-00000", etc
-    Iterable<String> split = Splitter.on('-').split(name);
-    List<String> parts = new ArrayList<String>();
-    for (String s : split) {
-      parts.add(s);
-    }
-    return Integer.parseInt(parts.get(2));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/distribution-bin/pom.xml
----------------------------------------------------------------------
diff --git a/distribution-bin/pom.xml b/distribution-bin/pom.xml
index 4eeedc9..6bdc405 100644
--- a/distribution-bin/pom.xml
+++ b/distribution-bin/pom.xml
@@ -40,6 +40,11 @@ under the License.
 		</dependency>
 		<dependency>
 			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-indexer</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
 			<artifactId>blur-shell</artifactId>
 			<version>${project.version}</version>
 		</dependency>