You are viewing a plain text version of this content. The canonical link for it is here.
Posted to blur-commits@incubator.apache.org by am...@apache.org on 2016/08/30 01:57:53 UTC
[08/13] git commit: Adding blur indexer project.
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataWithIndexLookup.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataWithIndexLookup.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataWithIndexLookup.java
deleted file mode 100644
index 0e2fe66..0000000
--- a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataWithIndexLookup.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.update;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.blur.BlurConfiguration;
-import org.apache.blur.manager.BlurPartitioner;
-import org.apache.blur.manager.writer.SnapshotIndexDeletionPolicy;
-import org.apache.blur.mapreduce.lib.BlurOutputFormat;
-import org.apache.blur.mapreduce.lib.BlurRecord;
-import org.apache.blur.store.BlockCacheDirectoryFactoryV2;
-import org.apache.blur.store.hdfs.HdfsDirectory;
-import org.apache.blur.thrift.generated.Column;
-import org.apache.blur.thrift.generated.FetchRecordResult;
-import org.apache.blur.thrift.generated.Record;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.blur.utils.RowDocumentUtil;
-import org.apache.blur.utils.ShardUtil;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.AtomicReader;
-import org.apache.lucene.index.AtomicReaderContext;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexCommit;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.Collector;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Scorer;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.store.Directory;
-
-import com.google.common.io.Closer;
-
-public class MapperForExistingDataWithIndexLookup extends Mapper<Text, BooleanWritable, IndexKey, IndexValue> {
-
- private static final Log LOG = LogFactory.getLog(MapperForExistingDataWithIndexLookup.class);
-
- private static final String BLUR_SNAPSHOT = "blur.snapshot";
- private Counter _existingRecords;
- private Counter _rowLookup;
- private BlurPartitioner _blurPartitioner;
- private Path _tablePath;
- private int _numberOfShardsInTable;
- private Configuration _configuration;
- private String _snapshot;
-
- private int _indexShard = -1;
- private DirectoryReader _reader;
- private IndexSearcher _indexSearcher;
- private long _totalNumberOfBytes;
- private Closer _closer;
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- Counter counter = context.getCounter(BlurIndexCounter.LOOKUP_MAPPER);
- counter.increment(1);
-
- _configuration = context.getConfiguration();
- _existingRecords = context.getCounter(BlurIndexCounter.LOOKUP_MAPPER_EXISTING_RECORDS);
- _rowLookup = context.getCounter(BlurIndexCounter.LOOKUP_MAPPER_ROW_LOOKUP_ATTEMPT);
- _blurPartitioner = new BlurPartitioner();
- TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration);
- _numberOfShardsInTable = tableDescriptor.getShardCount();
- _tablePath = new Path(tableDescriptor.getTableUri());
- _snapshot = getSnapshot(_configuration);
- _totalNumberOfBytes = _configuration.getLong(LookupBuilderReducer.BLUR_CACHE_DIR_TOTAL_BYTES, 128 * 1024 * 1024);
- _closer = Closer.create();
- }
-
- @Override
- protected void map(Text key, BooleanWritable value, Context context) throws IOException, InterruptedException {
- if (value.get()) {
- String rowId = key.toString();
- LOG.debug("Looking up rowid [" + rowId + "]");
- _rowLookup.increment(1);
- IndexSearcher indexSearcher = getIndexSearcher(rowId);
- Term term = new Term(BlurConstants.ROW_ID, rowId);
- RowCollector collector = getCollector(context);
- indexSearcher.search(new TermQuery(term), collector);
- LOG.debug("Looking for rowid [" + rowId + "] has [" + collector.records + "] records");
- }
- }
-
- @Override
- protected void cleanup(Mapper<Text, BooleanWritable, IndexKey, IndexValue>.Context context) throws IOException,
- InterruptedException {
- _closer.close();
- }
-
- static class RowCollector extends Collector {
-
- private AtomicReader reader;
- private Mapper<Text, BooleanWritable, IndexKey, IndexValue>.Context _context;
- private Counter _existingRecords;
- int records;
-
- RowCollector(Mapper<Text, BooleanWritable, IndexKey, IndexValue>.Context context, Counter existingRecords) {
- _context = context;
- _existingRecords = existingRecords;
- }
-
- @Override
- public void setScorer(Scorer scorer) throws IOException {
-
- }
-
- @Override
- public void setNextReader(AtomicReaderContext context) throws IOException {
- reader = context.reader();
- }
-
- @Override
- public void collect(int doc) throws IOException {
- Document document = reader.document(doc);
- FetchRecordResult result = RowDocumentUtil.getRecord(document);
- String rowid = result.getRowid();
- Record record = result.getRecord();
- String recordId = record.getRecordId();
- IndexKey oldDataKey = IndexKey.oldData(rowid, recordId);
- try {
- _context.write(oldDataKey, new IndexValue(toBlurRecord(rowid, record)));
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- _existingRecords.increment(1L);
- }
-
- private BlurRecord toBlurRecord(String rowId, Record record) {
- BlurRecord blurRecord = new BlurRecord();
- blurRecord.setRowId(rowId);
- blurRecord.setRecordId(record.getRecordId());
- blurRecord.setFamily(record.getFamily());
- List<Column> columns = record.getColumns();
- for (Column column : columns) {
- blurRecord.addColumn(column.getName(), column.getValue());
- }
- return blurRecord;
- }
-
- @Override
- public boolean acceptsDocsOutOfOrder() {
- return false;
- }
- }
-
- private RowCollector getCollector(Mapper<Text, BooleanWritable, IndexKey, IndexValue>.Context context) {
- return new RowCollector(context, _existingRecords);
- }
-
- private IndexSearcher getIndexSearcher(String rowId) throws IOException {
- int shard = _blurPartitioner.getShard(rowId, _numberOfShardsInTable);
- if (_indexSearcher != null) {
- if (shard != _indexShard) {
- throw new IOException("Input data is not partitioned correctly.");
- }
- return _indexSearcher;
- } else {
- _indexShard = shard;
- Path shardPath = new Path(_tablePath, ShardUtil.getShardName(_indexShard));
- HdfsDirectory hdfsDirectory = new HdfsDirectory(_configuration, shardPath);
- SnapshotIndexDeletionPolicy policy = new SnapshotIndexDeletionPolicy(_configuration,
- SnapshotIndexDeletionPolicy.getGenerationsPath(shardPath));
- Long generation = policy.getGeneration(_snapshot);
- if (generation == null) {
- hdfsDirectory.close();
- throw new IOException("Snapshot [" + _snapshot + "] not found in shard [" + shardPath + "]");
- }
-
- BlurConfiguration bc = new BlurConfiguration();
- BlockCacheDirectoryFactoryV2 blockCacheDirectoryFactoryV2 = new BlockCacheDirectoryFactoryV2(bc,
- _totalNumberOfBytes);
- _closer.register(blockCacheDirectoryFactoryV2);
- Directory dir = blockCacheDirectoryFactoryV2.newDirectory("table", "shard", hdfsDirectory, null);
-
- List<IndexCommit> listCommits = DirectoryReader.listCommits(dir);
- IndexCommit indexCommit = findIndexCommit(listCommits, generation, shardPath);
- _reader = DirectoryReader.open(indexCommit);
- return _indexSearcher = new IndexSearcher(_reader);
- }
- }
-
- public static IndexCommit findIndexCommit(List<IndexCommit> listCommits, long generation, Path shardDir)
- throws IOException {
- for (IndexCommit commit : listCommits) {
- if (commit.getGeneration() == generation) {
- return commit;
- }
- }
- throw new IOException("Generation [" + generation + "] not found in shard [" + shardDir + "]");
- }
-
- public static void setSnapshot(Job job, String snapshot) {
- setSnapshot(job.getConfiguration(), snapshot);
- }
-
- public static void setSnapshot(Configuration configuration, String snapshot) {
- configuration.set(BLUR_SNAPSHOT, snapshot);
- }
-
- public static String getSnapshot(Configuration configuration) {
- return configuration.get(BLUR_SNAPSHOT);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewDataMod.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewDataMod.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewDataMod.java
deleted file mode 100644
index d91d1f5..0000000
--- a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewDataMod.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.update;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-
-import org.apache.blur.mapreduce.lib.BlurRecord;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-public class MapperForNewDataMod extends Mapper<Text, BlurRecord, IndexKey, IndexValue> {
-
- private static final IndexValue EMPTY_RECORD = new IndexValue();
- private long _timestamp;
- private Counter _newRecords;
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- InputSplit inputSplit = context.getInputSplit();
- FileSplit fileSplit = getFileSplit(inputSplit);
- Path path = fileSplit.getPath();
- FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
- FileStatus fileStatus = fileSystem.getFileStatus(path);
- _timestamp = fileStatus.getModificationTime();
- _newRecords = context.getCounter(BlurIndexCounter.NEW_RECORDS);
- }
-
- private FileSplit getFileSplit(InputSplit inputSplit) throws IOException {
- if (inputSplit instanceof FileSplit) {
- return (FileSplit) inputSplit;
- }
- if (inputSplit.getClass().getName().equals("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) {
- try {
- Field declaredField = inputSplit.getClass().getDeclaredField("inputSplit");
- declaredField.setAccessible(true);
- return getFileSplit((InputSplit) declaredField.get(inputSplit));
- } catch (NoSuchFieldException e) {
- throw new IOException(e);
- } catch (SecurityException e) {
- throw new IOException(e);
- } catch (IllegalArgumentException e) {
- throw new IOException(e);
- } catch (IllegalAccessException e) {
- throw new IOException(e);
- }
- } else {
- throw new IOException("Unknown input split type [" + inputSplit + "] [" + inputSplit.getClass() + "]");
- }
- }
-
- @Override
- protected void map(Text key, BlurRecord blurRecord, Context context) throws IOException, InterruptedException {
- IndexKey newDataKey = IndexKey.newData(blurRecord.getRowId(), blurRecord.getRecordId(), _timestamp);
- context.write(newDataKey, new IndexValue(blurRecord));
- _newRecords.increment(1L);
-
- IndexKey newDataMarker = IndexKey.newDataMarker(blurRecord.getRowId());
- context.write(newDataMarker, EMPTY_RECORD);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MergeSortRowIdMatcher.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MergeSortRowIdMatcher.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MergeSortRowIdMatcher.java
deleted file mode 100644
index bd8580e..0000000
--- a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MergeSortRowIdMatcher.java
+++ /dev/null
@@ -1,388 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.update;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.blur.index.AtomicReaderUtil;
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.store.hdfs.DirectoryDecorator;
-import org.apache.blur.store.hdfs.HdfsDirectory;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.HdfsBlockLocation;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DeflateCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.io.compress.zlib.ZlibFactory;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Progressable;
-import org.apache.lucene.index.AtomicReaderContext;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexCommit;
-import org.apache.lucene.index.SegmentInfo;
-import org.apache.lucene.index.SegmentInfoPerCommit;
-import org.apache.lucene.index.SegmentInfos;
-import org.apache.lucene.index.SegmentReader;
-import org.apache.lucene.index.Terms;
-import org.apache.lucene.index.TermsEnum;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.util.BytesRef;
-
-public class MergeSortRowIdMatcher {
-
- private static final String DEL = ".del";
- private static final Log LOG = LogFactory.getLog(MergeSortRowIdMatcher.class);
- private static final Progressable NO_OP = new Progressable() {
- @Override
- public void progress() {
-
- }
- };
- private static final long _10_SECONDS = TimeUnit.SECONDS.toNanos(10);
-
- public interface Action {
- void found(Text rowId) throws IOException;
- }
-
- private final MyReader[] _readers;
- private final Configuration _configuration;
- private final Path _cachePath;
- private final IndexCommit _indexCommit;
- private final Directory _directory;
- private final Progressable _progressable;
-
- private DirectoryReader _reader;
-
- public MergeSortRowIdMatcher(Directory directory, long generation, Configuration configuration, Path cachePath)
- throws IOException {
- this(directory, generation, configuration, cachePath, null);
- }
-
- public MergeSortRowIdMatcher(Directory directory, long generation, Configuration configuration, Path cachePath,
- Progressable progressable) throws IOException {
- List<IndexCommit> listCommits = DirectoryReader.listCommits(directory);
- _indexCommit = findIndexCommit(listCommits, generation);
- _configuration = configuration;
- _cachePath = cachePath;
- _directory = directory;
- _progressable = progressable == null ? NO_OP : progressable;
- _readers = openReaders();
- }
-
- public void lookup(Text rowId, Action action) throws IOException {
- if (lookup(rowId)) {
- action.found(rowId);
- }
- }
-
- private boolean lookup(Text rowId) throws IOException {
- advanceReadersIfNeeded(rowId);
- sortReaders();
- return checkReaders(rowId);
- }
-
- private boolean checkReaders(Text rowId) {
- for (MyReader reader : _readers) {
- int compareTo = reader.getCurrentRowId().compareTo(rowId);
- if (compareTo == 0) {
- return true;
- } else if (compareTo > 0) {
- return false;
- }
- }
- return false;
- }
-
- private void advanceReadersIfNeeded(Text rowId) throws IOException {
- _progressable.progress();
- for (MyReader reader : _readers) {
- if (rowId.compareTo(reader.getCurrentRowId()) > 0) {
- advanceReader(reader, rowId);
- }
- }
- }
-
- private void advanceReader(MyReader reader, Text rowId) throws IOException {
- while (reader.next()) {
- if (rowId.compareTo(reader.getCurrentRowId()) <= 0) {
- return;
- }
- }
- }
-
- private static final Comparator<MyReader> COMP = new Comparator<MyReader>() {
- @Override
- public int compare(MyReader o1, MyReader o2) {
- return o1.getCurrentRowId().compareTo(o2.getCurrentRowId());
- }
- };
-
- private void sortReaders() {
- Arrays.sort(_readers, COMP);
- }
-
- private MyReader[] openReaders() throws IOException {
- Collection<SegmentKey> segmentKeys = getSegmentKeys();
- MyReader[] readers = new MyReader[segmentKeys.size()];
- int i = 0;
- for (SegmentKey segmentKey : segmentKeys) {
- readers[i++] = openReader(segmentKey);
- }
- return readers;
- }
-
- private MyReader openReader(SegmentKey segmentKey) throws IOException {
- Path file = getCacheFilePath(segmentKey);
- FileSystem fileSystem = _cachePath.getFileSystem(_configuration);
- if (!fileSystem.exists(file)) {
- createCacheFile(file, segmentKey);
- }
- Reader reader = new SequenceFile.Reader(_configuration, SequenceFile.Reader.file(file));
- return new MyReader(reader);
- }
-
- private void createCacheFile(Path file, SegmentKey segmentKey) throws IOException {
- LOG.info("Building cache for segment [{0}] to [{1}]", segmentKey, file);
- Path tmpPath = getTmpWriterPath(file.getParent());
- try (Writer writer = createWriter(_configuration, tmpPath)) {
- DirectoryReader reader = getReader();
- for (AtomicReaderContext context : reader.leaves()) {
- SegmentReader segmentReader = AtomicReaderUtil.getSegmentReader(context.reader());
- if (segmentReader.getSegmentName().equals(segmentKey.getSegmentName())) {
- writeRowIds(writer, segmentReader);
- break;
- }
- }
- }
- commitWriter(_configuration, file, tmpPath);
- }
-
- public static void commitWriter(Configuration configuration, Path file, Path tmpPath) throws IOException {
- FileSystem fileSystem = tmpPath.getFileSystem(configuration);
- LOG.info("Commit tmp [{0}] to file [{1}]", tmpPath, file);
- if (!fileSystem.rename(tmpPath, file)) {
- LOG.warn("Could not commit tmp file [{0}] to file [{1}]", tmpPath, file);
- }
- }
-
- public static Path getTmpWriterPath(Path dir) {
- return new Path(dir, UUID.randomUUID().toString() + ".tmp");
- }
-
- public static Writer createWriter(Configuration configuration, Path tmpPath) throws IOException {
- return SequenceFile.createWriter(configuration, SequenceFile.Writer.file(tmpPath),
- SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(NullWritable.class),
- SequenceFile.Writer.compression(CompressionType.BLOCK, getCodec(configuration)));
- }
-
- private static CompressionCodec getCodec(Configuration configuration) {
- if (ZlibFactory.isNativeZlibLoaded(configuration)) {
- return new GzipCodec();
- }
- return new DeflateCodec();
- }
-
- private void writeRowIds(Writer writer, SegmentReader segmentReader) throws IOException {
- Terms terms = segmentReader.terms(BlurConstants.ROW_ID);
- if (terms == null) {
- return;
- }
- TermsEnum termsEnum = terms.iterator(null);
- BytesRef rowId;
- long s = System.nanoTime();
- while ((rowId = termsEnum.next()) != null) {
- long n = System.nanoTime();
- if (n + _10_SECONDS > s) {
- _progressable.progress();
- s = System.nanoTime();
- }
- writer.append(new Text(rowId.utf8ToString()), NullWritable.get());
- }
- }
-
- private IndexCommit findIndexCommit(List<IndexCommit> listCommits, long generation) throws IOException {
- for (IndexCommit commit : listCommits) {
- if (commit.getGeneration() == generation) {
- return commit;
- }
- }
- throw new IOException("Generation [" + generation + "] not found.");
- }
-
- static class SegmentKey {
-
- final String _segmentName;
- final String _id;
-
- SegmentKey(String segmentName, String id) throws IOException {
- _segmentName = segmentName;
- _id = id;
- }
-
- String getSegmentName() {
- return _segmentName;
- }
-
- @Override
- public String toString() {
- return _id;
- }
- }
-
- private DirectoryReader getReader() throws IOException {
- if (_reader == null) {
- _reader = DirectoryReader.open(_indexCommit);
- }
- return _reader;
- }
-
- private Collection<SegmentKey> getSegmentKeys() throws IOException {
- List<SegmentKey> keys = new ArrayList<SegmentKey>();
- SegmentInfos segmentInfos = new SegmentInfos();
- segmentInfos.read(_directory, _indexCommit.getSegmentsFileName());
- for (SegmentInfoPerCommit segmentInfoPerCommit : segmentInfos) {
- String name = segmentInfoPerCommit.info.name;
- String id = getId(segmentInfoPerCommit.info);
- keys.add(new SegmentKey(name, id));
- }
- return keys;
- }
-
- private String getId(SegmentInfo si) throws IOException {
- HdfsDirectory dir = getHdfsDirectory(si.dir);
- Set<String> files = new TreeSet<String>(si.files());
- return getId(_configuration, dir, files);
- }
-
- private static String getId(Configuration configuration, HdfsDirectory dir, Set<String> files) throws IOException {
- long ts = 0;
- String file = null;
- for (String f : files) {
- if (f.endsWith(DEL)) {
- continue;
- }
- long fileModified = dir.getFileModified(f);
- if (fileModified > ts) {
- ts = fileModified;
- file = f;
- }
- }
-
- Path path = dir.getPath();
- FileSystem fileSystem = path.getFileSystem(configuration);
- Path realFile = new Path(path, file);
- if (!fileSystem.exists(realFile)) {
- realFile = dir.getRealFilePathFromSymlink(file);
- if (!fileSystem.exists(realFile)) {
- throw new IOException("Lucene file [" + file + "] for dir [" + path + "] can not be found.");
- }
- }
- return getFirstBlockId(fileSystem, realFile);
- }
-
- public static String getIdForSingleSegmentIndex(Configuration configuration, Path indexPath) throws IOException {
- HdfsDirectory dir = new HdfsDirectory(configuration, indexPath);
- Set<String> files = new TreeSet<String>(Arrays.asList(dir.listAll()));
- return getId(configuration, dir, files);
- }
-
- private static String getFirstBlockId(FileSystem fileSystem, Path realFile) throws IOException {
- FileStatus fileStatus = fileSystem.getFileStatus(realFile);
- BlockLocation[] locations = fileSystem.getFileBlockLocations(fileStatus, 0, 1);
- HdfsBlockLocation location = (HdfsBlockLocation) locations[0];
- LocatedBlock locatedBlock = location.getLocatedBlock();
- ExtendedBlock block = locatedBlock.getBlock();
- return toNiceString(block.getBlockId());
- }
-
- private static String toNiceString(long blockId) {
- return "b" + blockId;
- }
-
- private static HdfsDirectory getHdfsDirectory(Directory dir) {
- if (dir instanceof HdfsDirectory) {
- return (HdfsDirectory) dir;
- } else if (dir instanceof DirectoryDecorator) {
- DirectoryDecorator dd = (DirectoryDecorator) dir;
- return getHdfsDirectory(dd.getOriginalDirectory());
- } else {
- throw new RuntimeException("Unknown directory type.");
- }
- }
-
- private Path getCacheFilePath(SegmentKey segmentKey) {
- return new Path(_cachePath, segmentKey + ".seq");
- }
-
- static class MyReader {
-
- final Reader _reader;
- final Text _rowId = new Text();
- boolean _finished = false;
-
- public MyReader(Reader reader) {
- _reader = reader;
- }
-
- public Text getCurrentRowId() {
- return _rowId;
- }
-
- public boolean next() throws IOException {
- if (_finished) {
- return false;
- }
- if (_reader.next(_rowId)) {
- return true;
- }
- _finished = true;
- return false;
- }
-
- public boolean isFinished() {
- return _finished;
- }
- }
-
- public static Path getCachePath(Path cachePath, String table, String shardName) {
- return new Path(new Path(cachePath, table), shardName);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/PrunedBlurInputFormat.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/PrunedBlurInputFormat.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/PrunedBlurInputFormat.java
deleted file mode 100644
index 8738c5a..0000000
--- a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/PrunedBlurInputFormat.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.update;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.blur.mapreduce.lib.BlurInputFormat;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-
-public class PrunedBlurInputFormat extends BlurInputFormat {
-
- private static final Log LOG = LogFactory.getLog(PrunedBlurInputFormat.class);
-
- @Override
- public List<InputSplit> getSplits(JobContext context) throws IOException {
- Path[] dirs = getInputPaths(context);
- Configuration configuration = context.getConfiguration();
- List<BlurInputSplit> splits = getSplits(configuration, dirs);
- Map<Path, List<BlurInputSplit>> splitMap = new TreeMap<Path, List<BlurInputSplit>>();
- for (BlurInputSplit split : splits) {
- Path path = split.getDir();
- String table = split.getTable().toString();
- int shard = InputSplitPruneUtil.getShardFromDirectoryPath(path);
- long rowIdUpdateFromNewDataCount = InputSplitPruneUtil.getBlurLookupRowIdUpdateFromNewDataCount(configuration,
- table, shard);
- long indexCount = InputSplitPruneUtil.getBlurLookupRowIdFromIndexCount(configuration, table, shard);
- if (rowIdUpdateFromNewDataCount == 0 || indexCount == 0) {
- LOG.info("Pruning id lookup input path [" + path + "] no overlapping ids.");
- } else if (InputSplitPruneUtil.shouldLookupExecuteOnShard(configuration, table, shard)) {
- LOG.info("Pruning blur input path [" + split.getDir() + "]");
- } else {
- LOG.debug("Keeping blur input path [" + split.getDir() + "]");
- List<BlurInputSplit> list = splitMap.get(path);
- if (list == null) {
- splitMap.put(path, list = new ArrayList<BlurInputSplit>());
- }
- list.add(split);
- }
- }
- List<InputSplit> result = new ArrayList<InputSplit>();
- for (List<BlurInputSplit> lst : splitMap.values()) {
- BlurInputSplitColletion blurInputSplitColletion = new BlurInputSplitColletion();
- for (BlurInputSplit blurInputSplit : lst) {
- blurInputSplitColletion.add(blurInputSplit);
- }
- result.add(blurInputSplitColletion);
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/PrunedSequenceFileInputFormat.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/PrunedSequenceFileInputFormat.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/PrunedSequenceFileInputFormat.java
deleted file mode 100644
index 58e9800..0000000
--- a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/PrunedSequenceFileInputFormat.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.update;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-
-import com.google.common.base.Splitter;
-
-public class PrunedSequenceFileInputFormat<K, V> extends SequenceFileInputFormat<K, V> {
-
- private static final Log LOG = LogFactory.getLog(PrunedSequenceFileInputFormat.class);
-
- @Override
- public List<InputSplit> getSplits(JobContext job) throws IOException {
- List<InputSplit> splits = super.getSplits(job);
- List<InputSplit> results = new ArrayList<InputSplit>();
- Configuration configuration = job.getConfiguration();
- String table = InputSplitPruneUtil.getTable(configuration);
- for (InputSplit inputSplit : splits) {
- FileSplit fileSplit = (FileSplit) inputSplit;
- Path path = fileSplit.getPath();
- LOG.debug("Getting shard index from path [" + path + "]");
- String name = path.getName();
- int shard = getShardIndex(name);
- long rowIdUpdateFromNewDataCount = InputSplitPruneUtil.getBlurLookupRowIdUpdateFromNewDataCount(configuration,
- table, shard);
- long indexCount = InputSplitPruneUtil.getBlurLookupRowIdFromIndexCount(configuration, table, shard);
- if (rowIdUpdateFromNewDataCount == 0 || indexCount == 0) {
- LOG.info("Pruning id lookup input path [" + path + "] no overlapping ids.");
- } else if (InputSplitPruneUtil.shouldLookupExecuteOnShard(configuration, table, shard)) {
- LOG.debug("Keeping id lookup input path [" + path + "]");
- results.add(inputSplit);
- } else {
- LOG.info("Pruning id lookup input path [" + path + "]");
- }
- }
- return results;
- }
-
- private int getShardIndex(String name) {
- // based on file format of "part-r-00000", etc
- Iterable<String> split = Splitter.on('-').split(name);
- List<String> parts = new ArrayList<String>();
- for (String s : split) {
- parts.add(s);
- }
- return Integer.parseInt(parts.get(2));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/distribution-bin/pom.xml
----------------------------------------------------------------------
diff --git a/distribution-bin/pom.xml b/distribution-bin/pom.xml
index 4eeedc9..6bdc405 100644
--- a/distribution-bin/pom.xml
+++ b/distribution-bin/pom.xml
@@ -40,6 +40,11 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.blur</groupId>
+ <artifactId>blur-indexer</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.blur</groupId>
<artifactId>blur-shell</artifactId>
<version>${project.version}</version>
</dependency>