You are viewing a plain text version of this content. The canonical link for it is here.
Posted to blur-commits@incubator.apache.org by am...@apache.org on 2016/08/30 01:57:50 UTC
[05/13] git commit: Fourth round of udpates.
Fourth round of udpates.
Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/96a1821a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/96a1821a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/96a1821a
Branch: refs/heads/master
Commit: 96a1821a0a723fafefd79a8a6769cf1286e95eaa
Parents: ea50630
Author: Aaron McCurry <am...@gmail.com>
Authored: Sat May 7 13:12:48 2016 -0400
Committer: Aaron McCurry <am...@gmail.com>
Committed: Sat May 7 13:12:48 2016 -0400
----------------------------------------------------------------------
.../mapreduce/lib/GenericBlurRecordWriter.java | 57 +++++-
.../mapreduce/lib/PrimeDocOverFlowHelper.java | 31 ++-
.../mapreduce/lib/update/UpdateReducer.java | 23 ++-
.../blur/lucene/search/FacetExecutor.java | 89 ++++++---
.../apache/blur/lucene/search/FacetQuery.java | 14 +-
.../BaseReadMaskFieldTypeDefinitionTest.java | 13 +-
blur-shell/pom.xml | 7 +
.../ListRunningPlatformCommandsCommand.java | 43 ++--
.../main/java/org/apache/blur/shell/Main.java | 3 +-
.../org/apache/blur/shell/TableDisplay.java | 3 +-
.../org/apache/blur/shell/WatchCommands.java | 149 ++++++++++++++
.../lucene/codec/DiskDocValuesProducer.java | 195 ++++++++++++++-----
.../store/blockcache_v2/CacheIndexInput.java | 4 +-
.../blur/store/blockcache_v2/MeterWrapper.java | 13 +-
.../cachevalue/DetachableCacheValue.java | 48 +++--
.../blur/store/hdfs/SequentialReadControl.java | 3 +-
.../packed/DirectPacked64SingleBlockReader.java | 65 +++++++
.../lucene/util/packed/DirectPackedReader.java | 80 ++++++++
.../org/apache/blur/utils/BlurConstants.java | 5 +-
deploy.sh | 62 ++++++
20 files changed, 754 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
index a947980..8828f85 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
@@ -18,9 +18,12 @@ package org.apache.blur.mapreduce.lib;
import java.io.File;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.blur.analysis.FieldManager;
import org.apache.blur.log.Log;
@@ -47,11 +50,11 @@ import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.SlowCompositeReaderWrapper;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
@@ -95,6 +98,7 @@ public class GenericBlurRecordWriter {
private ProgressableDirectory _localTmpDir;
private String _deletedRowId;
private Configuration _configuration;
+ private String _currentRowId;
public GenericBlurRecordWriter(Configuration configuration, int attemptId, String tmpDirName) throws IOException {
_configuration = configuration;
@@ -200,6 +204,7 @@ public class GenericBlurRecordWriter {
private void add(BlurMutate value) throws IOException {
BlurRecord blurRecord = value.getRecord();
+ _currentRowId = blurRecord.getRowId();
Record record = getRecord(blurRecord);
String recordId = record.getRecordId();
if (value.getMutateType() == MUTATE_TYPE.DELETE) {
@@ -224,7 +229,7 @@ public class GenericBlurRecordWriter {
private void flushToTmpIndexIfNeeded() throws IOException {
if (_documentBufferStrategy.isFull()) {
- LOG.info("Document Buffer is full overflow to disk.");
+ LOG.info("RowId [" + _currentRowId + "] - Document Buffer is full overflow to disk.");
flushToTmpIndex();
}
}
@@ -273,15 +278,35 @@ public class GenericBlurRecordWriter {
return record;
}
+ private static ThreadLocal<AtomicBoolean> _existingRow = new ThreadLocal<AtomicBoolean>() {
+ @Override
+ protected AtomicBoolean initialValue() {
+ return new AtomicBoolean();
+ }
+ };
+
+ public static boolean isCurrentRowExisting() {
+ return _existingRow.get().get();
+ }
+
+ public static void setCurrentRowExistingRowId(boolean existing) {
+ _existingRow.get().set(existing);
+ }
+
private void flush() throws CorruptIndexException, IOException {
+ boolean newRow = !isCurrentRowExisting();
if (_usingLocalTmpindex) {
// since we have flushed to disk then we do not need to index the
// delete.
flushToTmpIndex();
- _localTmpWriter.close(false);
+ LOG.info("RowId [" + _currentRowId + "] - forceMerge");
+ _localTmpWriter.forceMerge(1, true);
+ _localTmpWriter.close(true);
+
DirectoryReader reader = DirectoryReader.open(_localTmpDir);
- AtomicReader atomicReader = SlowCompositeReaderWrapper.wrap(reader);
- AtomicReader primeDocAtomicReader = PrimeDocOverFlowHelper.addPrimeDoc(atomicReader);
+ AtomicReader atomicReader = getAtomicReader(reader);
+ LOG.info("RowId [" + _currentRowId + "] - total documents [" + atomicReader.maxDoc() + "]");
+ AtomicReader primeDocAtomicReader = PrimeDocOverFlowHelper.addPrimeDoc(atomicReader, newRow, _currentRowId);
if (_countersSetup) {
_recordRateCounter.mark(reader.numDocs());
}
@@ -289,6 +314,7 @@ public class GenericBlurRecordWriter {
primeDocAtomicReader.close();
resetLocalTmp();
_writer.maybeMerge();
+ LOG.info("RowId [" + _currentRowId + "] - add complete");
if (_countersSetup) {
_rowOverFlowCount.increment(1);
}
@@ -303,6 +329,11 @@ public class GenericBlurRecordWriter {
} else {
List<List<Field>> docs = _documentBufferStrategy.getAndClearBuffer();
docs.get(0).add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
+ if (newRow) {
+ docs.get(0).add(new StringField(BlurConstants.NEW_ROW, BlurConstants.PRIME_DOC_VALUE, Store.NO));
+ } else {
+ docs.get(0).add(new StringField(BlurConstants.UPDATE_ROW, _currentRowId, Store.NO));
+ }
_writer.addDocuments(docs);
if (_countersSetup) {
_recordRateCounter.mark(docs.size());
@@ -316,10 +347,19 @@ public class GenericBlurRecordWriter {
}
}
+ private AtomicReader getAtomicReader(DirectoryReader reader) throws IOException {
+ List<AtomicReaderContext> leaves = reader.leaves();
+ if (leaves.size() == 1) {
+ return leaves.get(0).reader();
+ }
+ throw new IOException("Reader [" + reader + "] has more than one segment after optimize.");
+ }
+
private Document getDeleteDoc() {
Document document = new Document();
document.add(new StringField(BlurConstants.ROW_ID, _deletedRowId, Store.NO));
document.add(new StringField(BlurConstants.DELETE_MARKER, BlurConstants.DELETE_MARKER_VALUE, Store.NO));
+ document.add(new StringField(BlurConstants.UPDATE_ROW, _deletedRowId, Store.NO));
return document;
}
@@ -348,10 +388,17 @@ public class GenericBlurRecordWriter {
DirectoryReader reader = DirectoryReader.open(_localDir);
IndexWriter writer = new IndexWriter(copyRateDirectory, _conf.clone());
writer.addIndexes(reader);
+ writer.setCommitData(getInternalMarker());
writer.close();
rm(_localPath);
}
+ private Map<String, String> getInternalMarker() {
+ Map<String, String> map = new HashMap<String, String>();
+ map.put(BlurConstants.INTERNAL, BlurConstants.INTERNAL);
+ return map;
+ }
+
private void copyDir() throws IOException {
CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter);
String[] fileNames = _localDir.listAll();
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/PrimeDocOverFlowHelper.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/PrimeDocOverFlowHelper.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/PrimeDocOverFlowHelper.java
index 672a1c1..73d9c78 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/PrimeDocOverFlowHelper.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/PrimeDocOverFlowHelper.java
@@ -38,26 +38,47 @@ import org.apache.lucene.util.Version;
public class PrimeDocOverFlowHelper {
- private static Directory _directory;
+ private static Directory _directoryNewRow;
static {
try {
- _directory = new RAMDirectory();
- IndexWriter writer = new IndexWriter(_directory, new IndexWriterConfig(Version.LUCENE_43, new KeywordAnalyzer()));
+ _directoryNewRow = new RAMDirectory();
+ IndexWriter writer = new IndexWriter(_directoryNewRow, new IndexWriterConfig(Version.LUCENE_43,
+ new KeywordAnalyzer()));
Document document = new Document();
document.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
+ document.add(new StringField(BlurConstants.NEW_ROW, BlurConstants.PRIME_DOC_VALUE, Store.NO));
writer.addDocument(document);
writer.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
+
}
- public static AtomicReader addPrimeDoc(AtomicReader atomicReader) throws IOException {
- AtomicReaderContext context = DirectoryReader.open(_directory).leaves().get(0);
+ public static AtomicReader addPrimeDoc(AtomicReader atomicReader, boolean newRow, String currentRowId)
+ throws IOException {
+ AtomicReaderContext context = DirectoryReader.open(newRow ? _directoryNewRow : getDirectoryUpdateRow(currentRowId))
+ .leaves().get(0);
return new ParallelAtomicReader(true, setDocSize(context.reader(), atomicReader.maxDoc()), atomicReader);
}
+ private static Directory getDirectoryUpdateRow(String currentRowId) {
+ try {
+ RAMDirectory directoryUpdateRow = new RAMDirectory();
+ IndexWriter writer = new IndexWriter(directoryUpdateRow, new IndexWriterConfig(Version.LUCENE_43,
+ new KeywordAnalyzer()));
+ Document document = new Document();
+ document.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
+ document.add(new StringField(BlurConstants.UPDATE_ROW, currentRowId, Store.NO));
+ writer.addDocument(document);
+ writer.close();
+ return directoryUpdateRow;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private static AtomicReader setDocSize(AtomicReader reader, final int count) {
return new FilterAtomicReader(reader) {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/UpdateReducer.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/UpdateReducer.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/UpdateReducer.java
index f8705aa..d62617b 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/UpdateReducer.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/UpdateReducer.java
@@ -20,10 +20,11 @@ import java.io.IOException;
import org.apache.blur.mapreduce.lib.BlurMutate;
import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
-import org.apache.blur.mapreduce.lib.update.IndexKey.TYPE;
import org.apache.blur.mapreduce.lib.BlurOutputFormat;
import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.blur.mapreduce.lib.GenericBlurRecordWriter;
import org.apache.blur.mapreduce.lib.GetCounter;
+import org.apache.blur.mapreduce.lib.update.IndexKey.TYPE;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Reducer;
@@ -37,9 +38,9 @@ public class UpdateReducer extends Reducer<IndexKey, IndexValue, Text, BlurMutat
private static final String MARKER_RECORDS = "Marker Records";
private static final String SEP = " - ";
private static final String BLUR_UPDATE = "Blur Update";
- private static final String EXISTING_RCORDS = "Existing Rcords";
- private static final String NEW_RCORDS = "New Rcords";
- private static final String NO_UPDATE = "NoUpdate";
+ private static final String EXISTING_RECORDS = "Existing Records";
+ private static final String NEW_RECORDS = "New Records";
+ private static final String NO_UPDATE = "No Update";
private static final String UPDATE = "Update";
private static final String BLUR_UPDATE_DEBUG = BLUR_UPDATE + SEP + "DEBUG";
@@ -64,10 +65,10 @@ public class UpdateReducer extends Reducer<IndexKey, IndexValue, Text, BlurMutat
}
});
- _newRecordsUpdate = context.getCounter(BLUR_UPDATE, NEW_RCORDS + SEP + UPDATE);
- _newRecordsNoUpdate = context.getCounter(BLUR_UPDATE, NEW_RCORDS + SEP + NO_UPDATE);
- _existingRecordsUpdate = context.getCounter(BLUR_UPDATE, EXISTING_RCORDS + SEP + UPDATE);
- _existingRecordsNoUpdate = context.getCounter(BLUR_UPDATE, EXISTING_RCORDS + SEP + NO_UPDATE);
+ _newRecordsUpdate = context.getCounter(BLUR_UPDATE, NEW_RECORDS + SEP + UPDATE);
+ _newRecordsNoUpdate = context.getCounter(BLUR_UPDATE, NEW_RECORDS + SEP + NO_UPDATE);
+ _existingRecordsUpdate = context.getCounter(BLUR_UPDATE, EXISTING_RECORDS + SEP + UPDATE);
+ _existingRecordsNoUpdate = context.getCounter(BLUR_UPDATE, EXISTING_RECORDS + SEP + NO_UPDATE);
_ignoredExistingRows = context.getCounter(BLUR_UPDATE, IGNORED_EXISTING_ROWS);
_debugRecordsWithSameRecordId = context.getCounter(BLUR_UPDATE_DEBUG, MULTIPLE_RECORD_W_SAME_RECORD_ID);
@@ -76,7 +77,6 @@ public class UpdateReducer extends Reducer<IndexKey, IndexValue, Text, BlurMutat
_debugMarkerRecordsUpdate = context.getCounter(BLUR_UPDATE_DEBUG, MARKER_RECORDS + SEP + UPDATE);
_debugIndexValues = context.getCounter(BLUR_UPDATE_DEBUG, INDEX_VALUES);
_debugNullBlurRecords = context.getCounter(BLUR_UPDATE_DEBUG, NULL_BLUR_RECORDS);
-
}
@Override
@@ -93,6 +93,7 @@ public class UpdateReducer extends Reducer<IndexKey, IndexValue, Text, BlurMutat
InterruptedException {
BlurRecord prevBlurRecord = null;
String prevRecordId = null;
+ boolean existing = false;
for (IndexValue value : values) {
updateCounters(true, key);
BlurRecord br = value.getBlurRecord();
@@ -103,6 +104,9 @@ public class UpdateReducer extends Reducer<IndexKey, IndexValue, Text, BlurMutat
} else {
// Safe Copy
BlurRecord currentBlurRecord = new BlurRecord(br);
+ if (key.getType() == IndexKey.TYPE.OLD_DATA) {
+ existing = true;
+ }
String currentRecordId = currentBlurRecord.getRecordId();
if (prevRecordId != null) {
if (prevRecordId.equals(currentRecordId)) {
@@ -120,6 +124,7 @@ public class UpdateReducer extends Reducer<IndexKey, IndexValue, Text, BlurMutat
if (prevBlurRecord != null) {
context.write(new Text(prevBlurRecord.getRowId()), toMutate(prevBlurRecord));
}
+ GenericBlurRecordWriter.setCurrentRowExistingRowId(existing);
}
private void updateCounters(boolean update, IndexKey key) {
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java
----------------------------------------------------------------------
diff --git a/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java b/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java
index 683ba98..00383f7 100644
--- a/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java
+++ b/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java
@@ -18,11 +18,13 @@ package org.apache.blur.lucene.search;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -37,10 +39,11 @@ import org.apache.blur.log.Log;
import org.apache.blur.log.LogFactory;
import org.apache.blur.trace.Trace;
import org.apache.blur.trace.Tracer;
+import org.apache.blur.user.User;
+import org.apache.blur.user.UserContext;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.search.Collector;
-import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.util.OpenBitSet;
@@ -75,19 +78,21 @@ public class FacetExecutor {
public void collect(int doc) throws IOException {
if (_bitSet.fastGet(doc)) {
_hits++;
- } else {
- int nextSetBit = _bitSet.nextSetBit(doc);
- if (nextSetBit < 0) {
- throw new Finished();
- } else {
- int advance = _scorer.advance(nextSetBit);
- if (advance == DocIdSetIterator.NO_MORE_DOCS) {
- throw new Finished();
- }
- if (_bitSet.fastGet(advance)) {
- _hits++;
- }
- }
+ // } else {
+ // int nextSetBit = _bitSet.nextSetBit(doc);
+ // if (nextSetBit < 0) {
+ // LOG.debug("finished early, no more hits in query.");
+ // throw new Finished();
+ // } else {
+ // int advance = _scorer.advance(nextSetBit);
+ // if (advance == DocIdSetIterator.NO_MORE_DOCS) {
+ // LOG.debug("finished early, no more hits in facet.");
+ // throw new Finished();
+ // }
+ // if (_bitSet.fastGet(advance)) {
+ // _hits++;
+ // }
+ // }
}
}
@@ -137,14 +142,17 @@ public class FacetExecutor {
final String _readerStr;
final int _maxDoc;
final Lock[] _locks;
+ final String _instance;
@Override
public String toString() {
- return "Info scorers length [" + _scorers.length + "] reader [" + _reader + "]";
+ return "Info scorers length [" + _scorers.length + "] reader [" + _reader + "] scorers ["
+ + Arrays.asList(_scorers) + "]";
}
- Info(AtomicReaderContext context, Scorer[] scorers, Lock[] locks) {
+ Info(AtomicReaderContext context, Scorer[] scorers, Lock[] locks, String instance) {
AtomicReader reader = context.reader();
+ _instance = instance;
_bitSet = new OpenBitSet(reader.maxDoc());
_scorers = scorers;
_reader = reader;
@@ -155,13 +163,18 @@ public class FacetExecutor {
void process(AtomicLongArray counts, long[] minimumsBeforeReturning, AtomicBoolean running) throws IOException {
if (minimumsBeforeReturning == null) {
+ LOG.debug(getPrefix("no minimums before returning."));
Tracer trace = Trace.trace("processing facet - segment", Trace.param("reader", _readerStr),
Trace.param("maxDoc", _maxDoc), Trace.param("minimums", "NONE"), Trace.param("scorers", _scorers.length));
try {
for (int i = 0; i < _scorers.length && running.get(); i++) {
+ LOG.debug(getPrefix("running facet for scorer [{0}] [{1}]."), i, _scorers[i]);
SimpleCollector col = new SimpleCollector(_bitSet);
runFacet(counts, col, i);
}
+ if (!running.get()) {
+ LOG.debug(getPrefix("running was stopped."));
+ }
} finally {
trace.done();
}
@@ -177,7 +190,7 @@ public class FacetExecutor {
long min = minimumsBeforeReturning[id];
long currentCount = counts.get(id);
if (currentCount < min) {
- LOG.debug("Running facet, current count [{0}] min [{1}]", currentCount, min);
+ LOG.debug(getPrefix("Running facet, current count [{0}] min [{1}]"), currentCount, min);
SimpleCollectorExitEarly col = new SimpleCollectorExitEarly(_bitSet, currentCount, min);
runFacet(counts, col, id);
}
@@ -188,6 +201,9 @@ public class FacetExecutor {
ids.put(id);
}
}
+ if (!running.get()) {
+ LOG.debug(getPrefix("running was stopped."));
+ }
} catch (Exception e) {
throw new IOException(e);
}
@@ -206,19 +222,26 @@ public class FacetExecutor {
Tracer traceInner = Trace.trace("processing facet - segment - scorer", Trace.param("scorer", scorer),
Trace.param("scorer.cost", scorer.cost()));
try {
- // new ExitScorer(scorer).score(col);
+ LOG.debug(getPrefix("starting scorer [" + i + "]."));
scorer.score(col);
} catch (Finished e) {
// Do nothing, exiting early.
+ LOG.debug(getPrefix("finished early."));
} finally {
traceInner.done();
}
int hits = col._hits;
- LOG.debug("Facet [{0}] result [{1}]", i, hits);
+ LOG.debug(getPrefix("Facet [{0}] result [{1}]"), i, hits);
counts.addAndGet(i, hits);
+ } else {
+ LOG.debug(getPrefix("scorer [" + i + "] is null."));
}
col._hits = 0;
}
+
+ private String getPrefix(String s) {
+ return _instance + " " + s;
+ }
}
private final Map<Object, Info> _infoMap = new ConcurrentHashMap<Object, FacetExecutor.Info>();
@@ -228,6 +251,7 @@ public class FacetExecutor {
private final Lock[] _locks;
private final AtomicBoolean _running;
private boolean _processed;
+ private final String _instance = UUID.randomUUID().toString();
public FacetExecutor(int length) {
this(length, null, new AtomicLongArray(length));
@@ -250,26 +274,34 @@ public class FacetExecutor {
_locks[i] = new ReentrantReadWriteLock().writeLock();
}
_running = running;
+ User user = UserContext.getUser();
+ LOG.debug(getPrefix("User [{0}]"), user);
}
public void addScorers(AtomicReaderContext context, Scorer[] scorers) throws IOException {
+ LOG.debug(getPrefix("adding scorers context [{0}] [{1}]"), context, Arrays.asList(scorers));
if (scorers.length != _length) {
throw new IOException("Scorer length is not correct expecting [" + _length + "] actual [" + scorers.length + "]");
}
Object key = getKey(context);
Info info = _infoMap.get(key);
if (info == null) {
- info = new Info(context, scorers, _locks);
+ info = new Info(context, scorers, _locks, _instance);
_infoMap.put(key, info);
} else {
AtomicReader reader = context.reader();
- LOG.warn("Info about reader context [{0}] already created, existing Info [{1}] current reader [{2}].", context,
- info, reader);
+ LOG.warn(getPrefix("Info about reader context [{0}] already created, existing Info [{1}] current reader [{2}]."),
+ context, info, reader);
}
}
+ public String getPrefix(String s) {
+ return _instance + " " + s;
+ }
+
public boolean scorersAlreadyAdded(AtomicReaderContext context) {
Object key = getKey(context);
+ LOG.debug(getPrefix("scorersAlreadyAdded key [{0}]"), context);
return _infoMap.containsKey(key);
}
@@ -297,7 +329,9 @@ public class FacetExecutor {
}
public void processFacets(ExecutorService executor) throws IOException {
+ LOG.debug(getPrefix("processFacets called"));
if (!_processed) {
+ LOG.debug(getPrefix("processing Facets"));
Tracer trace = Trace.trace("processing facets");
try {
processInternal(executor);
@@ -310,11 +344,16 @@ public class FacetExecutor {
private void processInternal(ExecutorService executor) throws IOException {
List<Entry<Object, Info>> entries = new ArrayList<Entry<Object, Info>>(_infoMap.entrySet());
+ LOG.debug(getPrefix("entries count [{0}]"), entries.size());
Collections.sort(entries, COMPARATOR);
if (executor == null) {
+ LOG.debug(getPrefix("no executor"), entries.size());
for (Entry<Object, Info> e : entries) {
if (_running.get()) {
+ LOG.debug(getPrefix("processing [{0}] [{1}]"), e.getKey(), e.getValue());
e.getValue().process(_counts, _minimumsBeforeReturning, _running);
+ } else {
+ LOG.debug(getPrefix("No longer running."));
}
}
} else {
@@ -326,14 +365,18 @@ public class FacetExecutor {
@Override
public void run() {
try {
+ LOG.debug(getPrefix("processing [{0}] [{1}]"), entry.getKey(), entry.getValue());
entry.getValue().process(_counts, _minimumsBeforeReturning, _running);
} catch (Throwable e) {
- LOG.error("Unknown error", e);
+ LOG.error(getPrefix("Unknown error"), e);
} finally {
finished.incrementAndGet();
}
}
});
+ } else {
+ LOG.debug(getPrefix("No longer running."));
+ return;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-query/src/main/java/org/apache/blur/lucene/search/FacetQuery.java
----------------------------------------------------------------------
diff --git a/blur-query/src/main/java/org/apache/blur/lucene/search/FacetQuery.java b/blur-query/src/main/java/org/apache/blur/lucene/search/FacetQuery.java
index fbb342c..74ee6e0 100644
--- a/blur-query/src/main/java/org/apache/blur/lucene/search/FacetQuery.java
+++ b/blur-query/src/main/java/org/apache/blur/lucene/search/FacetQuery.java
@@ -17,6 +17,7 @@ package org.apache.blur.lucene.search;
* limitations under the License.
*/
import java.io.IOException;
+import java.util.Arrays;
import org.apache.blur.log.Log;
import org.apache.blur.log.LogFactory;
@@ -32,6 +33,8 @@ import org.apache.lucene.util.OpenBitSet;
public class FacetQuery extends AbstractWrapperQuery {
+ private static final Log LOG = LogFactory.getLog(FacetQuery.class);
+
private final Query[] _facets;
private final FacetExecutor _executor;
@@ -69,10 +72,11 @@ public class FacetQuery extends AbstractWrapperQuery {
if (_rewritten) {
return this;
}
+ Query[] facets = new Query[_facets.length];
for (int i = 0; i < _facets.length; i++) {
- _facets[i] = _facets[i].rewrite(reader);
+ facets[i] = _facets[i].rewrite(reader);
}
- return new FacetQuery(_query.rewrite(reader), _facets, _executor, true);
+ return new FacetQuery(_query.rewrite(reader), facets, _executor, true);
}
@Override
@@ -125,7 +129,11 @@ public class FacetQuery extends AbstractWrapperQuery {
}
if (!_executor.scorersAlreadyAdded(context)) {
Scorer[] scorers = getScorers(context, true, topScorer, acceptDocs);
- _executor.addScorers(context, scorers);
+ LOG.debug(_executor.getPrefix("Adding scorers for context [{0}] scorers [{1}]"), context,
+ Arrays.asList(scorers));
+ _executor.addScorers(context, scorers);
+ } else {
+ LOG.debug(_executor.getPrefix("Scorers already added for context [{0}]"), context);
}
return new FacetScorer(scorer, _executor, context);
}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-query/src/test/java/org/apache/blur/analysis/type/BaseReadMaskFieldTypeDefinitionTest.java
----------------------------------------------------------------------
diff --git a/blur-query/src/test/java/org/apache/blur/analysis/type/BaseReadMaskFieldTypeDefinitionTest.java b/blur-query/src/test/java/org/apache/blur/analysis/type/BaseReadMaskFieldTypeDefinitionTest.java
index 8be969b..4f4f29c 100644
--- a/blur-query/src/test/java/org/apache/blur/analysis/type/BaseReadMaskFieldTypeDefinitionTest.java
+++ b/blur-query/src/test/java/org/apache/blur/analysis/type/BaseReadMaskFieldTypeDefinitionTest.java
@@ -49,7 +49,6 @@ import org.apache.lucene.index.Fields;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
@@ -160,7 +159,7 @@ public abstract class BaseReadMaskFieldTypeDefinitionTest {
}
private void setupFieldManager(BaseFieldManager fieldManager) throws IOException {
- fieldManager.addColumnDefinition(FAM, "string", null, false, "string", true, false, null);
+ fieldManager.addColumnDefinition(FAM, "string", null, false, "string", false, false, null);
fieldManager.addColumnDefinition(FAM, "string2", null, false, "string", false, false, null);
fieldManager.addColumnDefinition(FAM, "read", null, false, "acl-read", false, false, null);
fieldManager.addColumnDefinition(FAM, "mask", null, false, "read-mask", false, false, null);
@@ -222,16 +221,6 @@ public abstract class BaseReadMaskFieldTypeDefinitionTest {
assertEquals(defaultReadMask, s);
}
}
-
- String s = document.get("fam.string");
- if (s == null || s.equals(getDefaultReadMask())) {
- AtomicReader atomicReader = searcher.getIndexReader().leaves().get(0).reader();
- SortedDocValues sortedDocValues = atomicReader.getSortedDocValues("fam.string");
- BytesRef result = new BytesRef();
- sortedDocValues.get(doc, result);
- assertEquals(0, result.length);
- }
-
}
reader.close();
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-shell/pom.xml
----------------------------------------------------------------------
diff --git a/blur-shell/pom.xml b/blur-shell/pom.xml
index fd7e92d..8834a3c 100644
--- a/blur-shell/pom.xml
+++ b/blur-shell/pom.xml
@@ -61,6 +61,13 @@
<build>
<finalName>blur-shell-${project.version}</finalName>
<plugins>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.2.1</version>
+ </plugin>
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-shell/src/main/java/org/apache/blur/shell/ListRunningPlatformCommandsCommand.java
----------------------------------------------------------------------
diff --git a/blur-shell/src/main/java/org/apache/blur/shell/ListRunningPlatformCommandsCommand.java b/blur-shell/src/main/java/org/apache/blur/shell/ListRunningPlatformCommandsCommand.java
index 2b48374..d92ad80 100644
--- a/blur-shell/src/main/java/org/apache/blur/shell/ListRunningPlatformCommandsCommand.java
+++ b/blur-shell/src/main/java/org/apache/blur/shell/ListRunningPlatformCommandsCommand.java
@@ -40,13 +40,24 @@ public class ListRunningPlatformCommandsCommand extends Command {
if (args.length != 1) {
throw new CommandException("Invalid args: " + help());
}
-
List<String> commandStatusList = client.commandStatusList(0, Short.MAX_VALUE);
RunningSummary runningSummary = new RunningSummary();
for (String id : commandStatusList) {
- CommandStatus commandStatus = client.commandStatus(id);
+ CommandStatus commandStatus;
+ try {
+ commandStatus = client.commandStatus(id);
+ } catch (BlurException e) {
+ String message = e.getMessage();
+ if (message != null && message.startsWith("NOT_FOUND")) {
+ commandStatus = null;
+ } else {
+ throw e;
+ }
+ }
+ if (commandStatus == null) {
+ continue;
+ }
Map<String, Map<CommandStatusState, Long>> serverStateMap = commandStatus.getServerStateMap();
- out.println(serverStateMap);
Map<CommandStatusState, Long> summary = getSummary(serverStateMap);
if (summary.containsKey(CommandStatusState.RUNNING)) {
runningSummary.add(commandStatus, summary);
@@ -56,7 +67,7 @@ public class ListRunningPlatformCommandsCommand extends Command {
runningSummary.print(out);
}
- private Map<CommandStatusState, Long> getSummary(Map<String, Map<CommandStatusState, Long>> serverStateMap) {
+ public static Map<CommandStatusState, Long> getSummary(Map<String, Map<CommandStatusState, Long>> serverStateMap) {
Map<CommandStatusState, Long> map = new HashMap<CommandStatusState, Long>();
for (Map<CommandStatusState, Long> m : serverStateMap.values()) {
for (Entry<CommandStatusState, Long> e : m.entrySet()) {
@@ -95,18 +106,7 @@ public class ListRunningPlatformCommandsCommand extends Command {
String executionId = commandStatus.getExecutionId();
String commandName = commandStatus.getCommandName();
User user = commandStatus.getUser();
- _summary.add(Arrays.asList(executionId, commandName, user.getUsername(), toString(summary)));
- }
-
- private String toString(Map<CommandStatusState, Long> summary) {
- StringBuilder builder = new StringBuilder();
- for (Entry<CommandStatusState, Long> e : summary.entrySet()) {
- if (builder.length() != 0) {
- builder.append(',');
- }
- builder.append(e.getKey().name()).append(":").append(e.getValue());
- }
- return builder.toString();
+ _summary.add(Arrays.asList(executionId, commandName, user.getUsername(), toStringSummary(summary)));
}
public void print(PrintWriter out) {
@@ -158,4 +158,15 @@ public class ListRunningPlatformCommandsCommand extends Command {
return len;
}
}
+
+ public static String toStringSummary(Map<CommandStatusState, Long> summary) {
+ StringBuilder builder = new StringBuilder();
+ for (Entry<CommandStatusState, Long> e : summary.entrySet()) {
+ if (builder.length() != 0) {
+ builder.append(',');
+ }
+ builder.append(e.getKey().name()).append(":").append(e.getValue());
+ }
+ return builder.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-shell/src/main/java/org/apache/blur/shell/Main.java
----------------------------------------------------------------------
diff --git a/blur-shell/src/main/java/org/apache/blur/shell/Main.java b/blur-shell/src/main/java/org/apache/blur/shell/Main.java
index bf4b0b5..82d8a42 100644
--- a/blur-shell/src/main/java/org/apache/blur/shell/Main.java
+++ b/blur-shell/src/main/java/org/apache/blur/shell/Main.java
@@ -400,7 +400,7 @@ public class Main {
public static String[] shellCommands = { "help", "debug", "timed", "quit", "reset", "user", "whoami", "trace",
"trace-remove", "trace-list" };
public static String[] platformCommands = { "command-list", "command-exec", "command-desc", "command-running",
- "command-cancel" };
+ "command-cancel", "command-watch" };
public static String[] serverCommands = { "logger", "logger-reset", "remove-shard" };
private static class HelpCommand extends Command {
@@ -704,6 +704,7 @@ public class Main {
register(builder, new ImportDataCommand());
register(builder, new ListRunningPlatformCommandsCommand());
register(builder, new CancelPlatformCommandCommand());
+ register(builder, new WatchCommands());
commands = builder.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-shell/src/main/java/org/apache/blur/shell/TableDisplay.java
----------------------------------------------------------------------
diff --git a/blur-shell/src/main/java/org/apache/blur/shell/TableDisplay.java b/blur-shell/src/main/java/org/apache/blur/shell/TableDisplay.java
index 94783e8..31a08e8 100644
--- a/blur-shell/src/main/java/org/apache/blur/shell/TableDisplay.java
+++ b/blur-shell/src/main/java/org/apache/blur/shell/TableDisplay.java
@@ -289,7 +289,7 @@ public class TableDisplay implements Closeable {
width--;
}
}
-
+
private void buffer(Canvas canvas, String value, int width) {
canvas.append(value);
width -= getVisibleLength(value);
@@ -683,4 +683,5 @@ public class TableDisplay implements Closeable {
public void setStopReadingInput(boolean b) {
_stopReadingInput.set(true);
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-shell/src/main/java/org/apache/blur/shell/WatchCommands.java
----------------------------------------------------------------------
diff --git a/blur-shell/src/main/java/org/apache/blur/shell/WatchCommands.java b/blur-shell/src/main/java/org/apache/blur/shell/WatchCommands.java
new file mode 100644
index 0000000..4b284e9
--- /dev/null
+++ b/blur-shell/src/main/java/org/apache/blur/shell/WatchCommands.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.shell;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import jline.console.ConsoleReader;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.generated.Blur;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.CommandStatus;
+import org.apache.blur.thrift.generated.CommandStatusState;
+import org.apache.blur.thrift.generated.User;
+
+public class WatchCommands extends Command {
+
+ @Override
+ public void doit(PrintWriter out, Blur.Iface client, String[] args) throws CommandException, TException,
+ BlurException {
+ ConsoleReader reader = this.getConsoleReader();
+ try {
+ doitInternal(client, reader);
+ } catch (IOException e) {
+ if (Main.debug) {
+ e.printStackTrace();
+ }
+ throw new CommandException(e.getMessage());
+ } finally {
+ if (reader != null) {
+ reader.setPrompt(Main.PROMPT);
+ }
+ }
+ }
+
+ private static void doitInternal(Iface client, ConsoleReader reader) throws IOException, TException, CommandException {
+ TableDisplay tableDisplay = new TableDisplay(reader);
+ tableDisplay.setSeperator("|");
+ tableDisplay.setHeader(0, "id");
+ tableDisplay.setHeader(1, "command");
+ tableDisplay.setHeader(2, "user");
+ tableDisplay.setHeader(3, "summary");
+
+ final AtomicBoolean running = new AtomicBoolean(true);
+ tableDisplay.addKeyHook(new Runnable() {
+ @Override
+ public void run() {
+ synchronized (running) {
+ running.set(false);
+ running.notifyAll();
+ }
+ }
+ }, 'q');
+
+ try {
+ int maxL = 0;
+ while (running.get()) {
+
+ List<String> commandStatusList = client.commandStatusList(0, Short.MAX_VALUE);
+ List<String[]> display = new ArrayList<String[]>();
+ for (String id : commandStatusList) {
+ CommandStatus commandStatus;
+ try {
+ commandStatus = client.commandStatus(id);
+ } catch (BlurException e) {
+ String message = e.getMessage();
+ if (message != null && message.startsWith("NOT_FOUND")) {
+ commandStatus = null;
+ } else {
+ throw e;
+ }
+ }
+ if (commandStatus == null) {
+ continue;
+ }
+ Map<String, Map<CommandStatusState, Long>> serverStateMap = commandStatus.getServerStateMap();
+ Map<CommandStatusState, Long> summary = ListRunningPlatformCommandsCommand.getSummary(serverStateMap);
+ String executionId = commandStatus.getExecutionId();
+ String commandName = commandStatus.getCommandName();
+ User user = commandStatus.getUser();
+ if (summary.containsKey(CommandStatusState.RUNNING)) {
+ String stringSummary = ListRunningPlatformCommandsCommand.toStringSummary(summary);
+ display.add(new String[] { executionId, commandName, user.toString(), stringSummary });
+ } else if (summary.containsKey(CommandStatusState.INTERRUPTED)) {
+ display
+ .add(new String[] { executionId, commandName, user.toString(), CommandStatusState.INTERRUPTED.name() });
+ } else {
+ display.add(new String[] { executionId, commandName, user.toString(), CommandStatusState.COMPLETE.name() });
+ }
+ }
+
+ int l = 0;
+ for (String[] array : display) {
+ tableDisplay.set(0, l, array[0]);
+ tableDisplay.set(1, l, array[1]);
+ tableDisplay.set(2, l, array[2]);
+ tableDisplay.set(3, l, array[3]);
+ l++;
+ }
+ if (l > maxL) {
+ maxL = l;
+ }
+ Thread.sleep(3000);
+ }
+ } catch (InterruptedException e) {
+ if (Main.debug) {
+ e.printStackTrace();
+ }
+ throw new CommandException(e.getMessage());
+ } finally {
+ tableDisplay.close();
+ }
+ }
+
+ @Override
+ public String description() {
+ return "Watch commands execute.";
+ }
+
+ @Override
+ public String usage() {
+ return "";
+ }
+
+ @Override
+ public String name() {
+ return "command-watch";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-store/src/main/java/org/apache/blur/lucene/codec/DiskDocValuesProducer.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/lucene/codec/DiskDocValuesProducer.java b/blur-store/src/main/java/org/apache/blur/lucene/codec/DiskDocValuesProducer.java
index 3bc6737..fd617b6 100644
--- a/blur-store/src/main/java/org/apache/blur/lucene/codec/DiskDocValuesProducer.java
+++ b/blur-store/src/main/java/org/apache/blur/lucene/codec/DiskDocValuesProducer.java
@@ -18,8 +18,8 @@ package org.apache.blur.lucene.codec;
*/
import java.io.IOException;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.blur.trace.Trace;
import org.apache.blur.trace.Tracer;
@@ -41,25 +41,33 @@ import org.apache.lucene.util.packed.BlockPackedReader;
import org.apache.lucene.util.packed.MonotonicBlockPackedReader;
class DiskDocValuesProducer extends DocValuesProducer {
- private final Map<Integer,NumericEntry> numerics;
- private final Map<Integer,BinaryEntry> binaries;
- private final Map<Integer,NumericEntry> ords;
- private final Map<Integer,NumericEntry> ordIndexes;
+ private final Map<Integer, NumericEntry> numerics;
+ private final Map<Integer, BinaryEntry> binaries;
+ private final Map<Integer, NumericEntry> ords;
+ private final Map<Integer, NumericEntry> ordIndexes;
+ private final Map<Integer, BinaryDocValues> _binaryDocValuesCache;
+ private final Map<Integer, NumericDocValues> _numericDocValuesCache;
+ private final Map<Integer, SortedDocValues> _sortedDocValuesCache;
+ private final Map<Integer, SortedSetDocValues> _sortedSetDocValuesCache;
private final IndexInput data;
+ private final boolean _cache = true;
- DiskDocValuesProducer(SegmentReadState state, String dataCodec, String dataExtension, String metaCodec, String metaExtension) throws IOException {
+ DiskDocValuesProducer(SegmentReadState state, String dataCodec, String dataExtension, String metaCodec,
+ String metaExtension) throws IOException {
String metaName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, metaExtension);
// read in the entries from the metadata file.
IndexInput in = state.directory.openInput(metaName, state.context);
boolean success = false;
try {
- CodecUtil.checkHeader(in, metaCodec,
- DiskDocValuesFormat.VERSION_START,
- DiskDocValuesFormat.VERSION_START);
- numerics = new HashMap<Integer,NumericEntry>();
- ords = new HashMap<Integer,NumericEntry>();
- ordIndexes = new HashMap<Integer,NumericEntry>();
- binaries = new HashMap<Integer,BinaryEntry>();
+ CodecUtil.checkHeader(in, metaCodec, DiskDocValuesFormat.VERSION_START, DiskDocValuesFormat.VERSION_START);
+ numerics = new ConcurrentHashMap<Integer, NumericEntry>();
+ ords = new ConcurrentHashMap<Integer, NumericEntry>();
+ ordIndexes = new ConcurrentHashMap<Integer, NumericEntry>();
+ binaries = new ConcurrentHashMap<Integer, BinaryEntry>();
+ _binaryDocValuesCache = new ConcurrentHashMap<Integer, BinaryDocValues>();
+ _numericDocValuesCache = new ConcurrentHashMap<Integer, NumericDocValues>();
+ _sortedDocValuesCache = new ConcurrentHashMap<Integer, SortedDocValues>();
+ _sortedSetDocValuesCache = new ConcurrentHashMap<Integer, SortedSetDocValues>();
readFields(in, state.fieldInfos);
success = true;
} finally {
@@ -69,14 +77,12 @@ class DiskDocValuesProducer extends DocValuesProducer {
IOUtils.closeWhileHandlingException(in);
}
}
-
+
String dataName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, dataExtension);
data = state.directory.openInput(dataName, state.context);
- CodecUtil.checkHeader(data, dataCodec,
- DiskDocValuesFormat.VERSION_START,
- DiskDocValuesFormat.VERSION_START);
+ CodecUtil.checkHeader(data, dataCodec, DiskDocValuesFormat.VERSION_START, DiskDocValuesFormat.VERSION_START);
}
-
+
private void readFields(IndexInput meta, FieldInfos infos) throws IOException {
int fieldNumber = meta.readVInt();
while (fieldNumber != -1) {
@@ -96,7 +102,7 @@ class DiskDocValuesProducer extends DocValuesProducer {
}
BinaryEntry b = readBinaryEntry(meta);
binaries.put(fieldNumber, b);
-
+
if (meta.readVInt() != fieldNumber) {
throw new CorruptIndexException("sorted entry for field: " + fieldNumber + " is corrupt");
}
@@ -115,7 +121,7 @@ class DiskDocValuesProducer extends DocValuesProducer {
}
BinaryEntry b = readBinaryEntry(meta);
binaries.put(fieldNumber, b);
-
+
if (meta.readVInt() != fieldNumber) {
throw new CorruptIndexException("sortedset entry for field: " + fieldNumber + " is corrupt");
}
@@ -124,7 +130,7 @@ class DiskDocValuesProducer extends DocValuesProducer {
}
NumericEntry n1 = readNumericEntry(meta);
ords.put(fieldNumber, n1);
-
+
if (meta.readVInt() != fieldNumber) {
throw new CorruptIndexException("sortedset entry for field: " + fieldNumber + " is corrupt");
}
@@ -139,7 +145,7 @@ class DiskDocValuesProducer extends DocValuesProducer {
fieldNumber = meta.readVInt();
}
}
-
+
static NumericEntry readNumericEntry(IndexInput meta) throws IOException {
NumericEntry entry = new NumericEntry();
entry.packedIntsVersion = meta.readVInt();
@@ -148,7 +154,7 @@ class DiskDocValuesProducer extends DocValuesProducer {
entry.blockSize = meta.readVInt();
return entry;
}
-
+
static BinaryEntry readBinaryEntry(IndexInput meta) throws IOException {
BinaryEntry entry = new BinaryEntry();
entry.minLength = meta.readVInt();
@@ -165,15 +171,30 @@ class DiskDocValuesProducer extends DocValuesProducer {
@Override
public NumericDocValues getNumeric(FieldInfo field) throws IOException {
- NumericEntry entry = numerics.get(field.number);
- return getNumeric(entry);
+ NumericDocValues numericDocValues = _numericDocValuesCache.get(field.number);
+ if (numericDocValues != null) {
+ return numericDocValues;
+ }
+ synchronized (_numericDocValuesCache) {
+ numericDocValues = _numericDocValuesCache.get(field.number);
+ if (numericDocValues != null) {
+ return numericDocValues;
+ }
+ NumericEntry entry = numerics.get(field.number);
+ numericDocValues = newNumeric(entry);
+ if (_cache && numericDocValues != null) {
+ _numericDocValuesCache.put(field.number, numericDocValues);
+ }
+ return numericDocValues;
+ }
}
-
- LongNumericDocValues getNumeric(NumericEntry entry) throws IOException {
+
+ LongNumericDocValues newNumeric(NumericEntry entry) throws IOException {
final IndexInput data = this.data.clone();
data.seek(entry.offset);
- final BlockPackedReader reader = new BlockPackedReader(data, entry.packedIntsVersion, entry.blockSize, entry.count, true);
+ final BlockPackedReader reader = new BlockPackedReader(data, entry.packedIntsVersion, entry.blockSize, entry.count,
+ true);
return new LongNumericDocValues() {
@Override
public long get(long id) {
@@ -184,6 +205,24 @@ class DiskDocValuesProducer extends DocValuesProducer {
@Override
public BinaryDocValues getBinary(FieldInfo field) throws IOException {
+ BinaryDocValues binaryDocValues = _binaryDocValuesCache.get(field.number);
+ if (binaryDocValues != null) {
+ return binaryDocValues;
+ }
+ synchronized (_binaryDocValuesCache) {
+ binaryDocValues = _binaryDocValuesCache.get(field.number);
+ if (binaryDocValues != null) {
+ return binaryDocValues;
+ }
+ binaryDocValues = newBinary(field);
+ if (_cache && binaryDocValues != null) {
+ _binaryDocValuesCache.put(field.number, binaryDocValues);
+ }
+ return binaryDocValues;
+ }
+ }
+
+ private BinaryDocValues newBinary(FieldInfo field) throws IOException {
BinaryEntry bytes = binaries.get(field.number);
if (bytes.minLength == bytes.maxLength) {
return getFixedBinary(field, bytes);
@@ -191,20 +230,30 @@ class DiskDocValuesProducer extends DocValuesProducer {
return getVariableBinary(field, bytes);
}
}
-
+
private BinaryDocValues getFixedBinary(FieldInfo field, final BinaryEntry bytes) {
final IndexInput data = this.data.clone();
return new LongBinaryDocValues() {
+
+ private final ThreadLocal<IndexInput> in = new ThreadLocal<IndexInput>() {
+ @Override
+ protected IndexInput initialValue() {
+ return data.clone();
+ }
+ };
+
@Override
public void get(long id, BytesRef result) {
long address = bytes.offset + id * bytes.maxLength;
try {
- data.seek(address);
- // NOTE: we could have one buffer, but various consumers (e.g. FieldComparatorSource)
+ IndexInput indexInput = in.get();
+ indexInput.seek(address);
+ // NOTE: we could have one buffer, but various consumers (e.g.
+ // FieldComparatorSource)
// assume "they" own the bytes after calling this!
final byte[] buffer = new byte[bytes.maxLength];
- data.readBytes(buffer, 0, buffer.length);
+ indexInput.readBytes(buffer, 0, buffer.length);
result.bytes = buffer;
result.offset = 0;
result.length = buffer.length;
@@ -214,10 +263,10 @@ class DiskDocValuesProducer extends DocValuesProducer {
}
};
}
-
+
private BinaryDocValues getVariableBinary(FieldInfo field, final BinaryEntry bytes) throws IOException {
final IndexInput data = this.data.clone();
-
+
Tracer trace = Trace.trace("getSorted - BlockPackedReader - create");
final MonotonicBlockPackedReader addresses;
try {
@@ -227,17 +276,27 @@ class DiskDocValuesProducer extends DocValuesProducer {
trace.done();
}
return new LongBinaryDocValues() {
+
+ private final ThreadLocal<IndexInput> _input = new ThreadLocal<IndexInput>() {
+ @Override
+ protected IndexInput initialValue() {
+ return data.clone();
+ }
+ };
+
@Override
public void get(long id, BytesRef result) {
- long startAddress = bytes.offset + (id == 0 ? 0 : addresses.get(id-1));
+ long startAddress = bytes.offset + (id == 0 ? 0 : addresses.get(id - 1));
long endAddress = bytes.offset + addresses.get(id);
int length = (int) (endAddress - startAddress);
try {
- data.seek(startAddress);
- // NOTE: we could have one buffer, but various consumers (e.g. FieldComparatorSource)
+ IndexInput indexInput = _input.get();
+ indexInput.seek(startAddress);
+ // NOTE: we could have one buffer, but various consumers (e.g.
+ // FieldComparatorSource)
// assume "they" own the bytes after calling this!
final byte[] buffer = new byte[length];
- data.readBytes(buffer, 0, buffer.length);
+ indexInput.readBytes(buffer, 0, buffer.length);
result.bytes = buffer;
result.offset = 0;
result.length = length;
@@ -250,11 +309,29 @@ class DiskDocValuesProducer extends DocValuesProducer {
@Override
public SortedDocValues getSorted(FieldInfo field) throws IOException {
+ SortedDocValues sortedDocValues = _sortedDocValuesCache.get(field.number);
+ if (sortedDocValues != null) {
+ return sortedDocValues;
+ }
+ synchronized (_sortedDocValuesCache) {
+ sortedDocValues = _sortedDocValuesCache.get(field.number);
+ if (sortedDocValues != null) {
+ return sortedDocValues;
+ }
+ sortedDocValues = newSortedDocValues(field);
+ if (_cache && sortedDocValues != null) {
+ _sortedDocValuesCache.put(field.number, sortedDocValues);
+ }
+ return sortedDocValues;
+ }
+ }
+
+ private SortedDocValues newSortedDocValues(FieldInfo field) throws IOException {
final int valueCount = (int) binaries.get(field.number).count;
final BinaryDocValues binary = getBinary(field);
Tracer trace = Trace.trace("getSorted - BlockPackedReader - create");
final BlockPackedReader ordinals;
- try{
+ try {
NumericEntry entry = ords.get(field.number);
IndexInput data = this.data.clone();
data.seek(entry.offset);
@@ -283,14 +360,32 @@ class DiskDocValuesProducer extends DocValuesProducer {
@Override
public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException {
+ SortedSetDocValues sortedSetDocValues = _sortedSetDocValuesCache.get(field.number);
+ if (sortedSetDocValues != null) {
+ return sortedSetDocValues;
+ }
+ synchronized (_sortedSetDocValuesCache) {
+ sortedSetDocValues = _sortedSetDocValuesCache.get(field.number);
+ if (sortedSetDocValues != null) {
+ return sortedSetDocValues;
+ }
+ sortedSetDocValues = newSortedSetDocValues(field);
+ if (_cache && sortedSetDocValues != null) {
+ _sortedSetDocValuesCache.put(field.number, sortedSetDocValues);
+ }
+ return sortedSetDocValues;
+ }
+ }
+
+ private SortedSetDocValues newSortedSetDocValues(FieldInfo field) throws IOException {
final long valueCount = binaries.get(field.number).count;
// we keep the byte[]s and list of ords on disk, these could be large
final LongBinaryDocValues binary = (LongBinaryDocValues) getBinary(field);
- final LongNumericDocValues ordinals = getNumeric(ords.get(field.number));
+ final LongNumericDocValues ordinals = newNumeric(ords.get(field.number));
Tracer trace = Trace.trace("getSortedSet - MonotonicBlockPackedReader - create");
final MonotonicBlockPackedReader ordIndex;
- try{
+ try {
NumericEntry entry = ordIndexes.get(field.number);
IndexInput data = this.data.clone();
data.seek(entry.offset);
@@ -302,7 +397,7 @@ class DiskDocValuesProducer extends DocValuesProducer {
return new SortedSetDocValues() {
long offset;
long endOffset;
-
+
@Override
public long nextOrd() {
if (offset == endOffset) {
@@ -316,7 +411,7 @@ class DiskDocValuesProducer extends DocValuesProducer {
@Override
public void setDocument(int docID) {
- offset = (docID == 0 ? 0 : ordIndex.get(docID-1));
+ offset = (docID == 0 ? 0 : ordIndex.get(docID - 1));
endOffset = ordIndex.get(docID);
}
@@ -336,7 +431,7 @@ class DiskDocValuesProducer extends DocValuesProducer {
public void close() throws IOException {
data.close();
}
-
+
static class NumericEntry {
long offset;
@@ -344,7 +439,7 @@ class DiskDocValuesProducer extends DocValuesProducer {
long count;
int blockSize;
}
-
+
static class BinaryEntry {
long offset;
@@ -355,23 +450,23 @@ class DiskDocValuesProducer extends DocValuesProducer {
int packedIntsVersion;
int blockSize;
}
-
+
// internally we compose complex dv (sorted/sortedset) from other ones
static abstract class LongNumericDocValues extends NumericDocValues {
@Override
public final long get(int docID) {
return get((long) docID);
}
-
+
abstract long get(long id);
}
-
+
static abstract class LongBinaryDocValues extends BinaryDocValues {
@Override
public final void get(int docID, BytesRef result) {
- get((long)docID, result);
+ get((long) docID, result);
}
-
+
abstract void get(long id, BytesRef Result);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java
index 18b9eda..3803cc5 100644
--- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java
@@ -224,6 +224,7 @@ public class CacheIndexInput extends IndexInput {
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
ensureOpen();
+ LOOP:
while (len > 0) {
tryToFill();
int remaining = remaining();
@@ -232,8 +233,7 @@ public class CacheIndexInput extends IndexInput {
_cacheValue.read(_blockPosition, b, offset, length);
} catch (EvictionException e) {
releaseCache();
- readBytes(b, offset, len);
- return;
+ continue LOOP;
}
offset += length;
len -= length;
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/MeterWrapper.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/MeterWrapper.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/MeterWrapper.java
index 2e8d245..269245a 100644
--- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/MeterWrapper.java
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/MeterWrapper.java
@@ -121,11 +121,14 @@ public abstract class MeterWrapper implements Closeable {
}
private static void register(String id, SimpleMeter meter, AtomicLong counter) {
- {
- _counterMap.putIfAbsent(id, new MeterWrapperCounter(meter));
- }
- {
- _counterMap.get(id).add(counter);
+ MeterWrapperCounter meterWrapperCounter = new MeterWrapperCounter(meter);
+ while (true) {
+ _counterMap.putIfAbsent(id, meterWrapperCounter);
+ MeterWrapperCounter wrapperCounter = _counterMap.get(id);
+ if (wrapperCounter != null) {
+ wrapperCounter.add(counter);
+ return;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/DetachableCacheValue.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/DetachableCacheValue.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/DetachableCacheValue.java
index e5ce99d..24eb7ca 100644
--- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/DetachableCacheValue.java
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/DetachableCacheValue.java
@@ -38,7 +38,6 @@ public class DetachableCacheValue implements CacheValue {
}
private volatile CacheValue _baseCacheValue;
- private volatile boolean _evicted;
public DetachableCacheValue(CacheValue cacheValue) {
_baseCacheValue = cacheValue;
@@ -46,7 +45,6 @@ public class DetachableCacheValue implements CacheValue {
@Override
public CacheValue detachFromCache() {
- _evicted = true;
if (_baseCacheValue instanceof ByteArrayCacheValue) {
// already detached
return null;
@@ -64,8 +62,11 @@ public class DetachableCacheValue implements CacheValue {
@Override
public int length() throws EvictionException {
- checkEviction();
- return _baseCacheValue.length();
+ try {
+ return _baseCacheValue.length();
+ } catch (NullPointerException npe) {
+ throw new EvictionException();
+ }
}
@Override
@@ -75,20 +76,20 @@ public class DetachableCacheValue implements CacheValue {
@Override
public void read(int position, byte[] buf, int offset, int length) throws EvictionException {
- checkEviction();
- _baseCacheValue.read(position, buf, offset, length);
- }
-
- private void checkEviction() throws EvictionException {
- if (_evicted) {
+ try {
+ _baseCacheValue.read(position, buf, offset, length);
+ } catch (NullPointerException npe) {
throw new EvictionException();
}
}
@Override
public byte read(int position) throws EvictionException {
- checkEviction();
- return _baseCacheValue.read(position);
+ try {
+ return _baseCacheValue.read(position);
+ } catch (NullPointerException npe) {
+ throw new EvictionException();
+ }
}
@Override
@@ -100,20 +101,29 @@ public class DetachableCacheValue implements CacheValue {
@Override
public short readShort(int position) throws EvictionException {
- checkEviction();
- return _baseCacheValue.readShort(position);
+ try {
+ return _baseCacheValue.readShort(position);
+ } catch (NullPointerException npe) {
+ throw new EvictionException();
+ }
}
@Override
public int readInt(int position) throws EvictionException {
- checkEviction();
- return _baseCacheValue.readInt(position);
+ try {
+ return _baseCacheValue.readInt(position);
+ } catch (NullPointerException npe) {
+ throw new EvictionException();
+ }
}
@Override
public long readLong(int position) throws EvictionException {
- checkEviction();
- return _baseCacheValue.readLong(position);
+ try {
+ return _baseCacheValue.readLong(position);
+ } catch (NullPointerException npe) {
+ throw new EvictionException();
+ }
}
@Override
@@ -123,7 +133,7 @@ public class DetachableCacheValue implements CacheValue {
@Override
public boolean isEvicted() {
- return _evicted;
+ return _baseCacheValue == null;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-store/src/main/java/org/apache/blur/store/hdfs/SequentialReadControl.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/SequentialReadControl.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/SequentialReadControl.java
index ac72eb9..34371e5 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/SequentialReadControl.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/SequentialReadControl.java
@@ -44,7 +44,8 @@ public class SequentialReadControl implements Cloneable {
public SequentialReadControl clone() {
try {
SequentialReadControl control = (SequentialReadControl) super.clone();
- setup(_configuration, control);
+ // Setup too heavy for clones
+ // setup(_configuration, control);
return control;
} catch (CloneNotSupportedException e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-store/src/main/java/org/apache/lucene/util/packed/DirectPacked64SingleBlockReader.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/lucene/util/packed/DirectPacked64SingleBlockReader.java b/blur-store/src/main/java/org/apache/lucene/util/packed/DirectPacked64SingleBlockReader.java
new file mode 100644
index 0000000..e5d495b
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/lucene/util/packed/DirectPacked64SingleBlockReader.java
@@ -0,0 +1,65 @@
+package org.apache.lucene.util.packed;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.lucene.store.IndexInput;
+
+final class DirectPacked64SingleBlockReader extends PackedInts.ReaderImpl {
+
+ private final ThreadLocal<IndexInput> in;
+ private final long startPointer;
+ private final int valuesPerBlock;
+ private final long mask;
+
+ DirectPacked64SingleBlockReader(int bitsPerValue, int valueCount,
+ IndexInput input) {
+ super(valueCount, bitsPerValue);
+ this.in = new ThreadLocal<IndexInput>() {
+ @Override
+ protected IndexInput initialValue() {
+ return input.clone();
+ }
+ };
+ startPointer = input.getFilePointer();
+ valuesPerBlock = 64 / bitsPerValue;
+ mask = ~(~0L << bitsPerValue);
+ }
+
+ @Override
+ public long get(int index) {
+ final int blockOffset = index / valuesPerBlock;
+ final long skip = ((long) blockOffset) << 3;
+ try {
+ IndexInput indexInput = in.get();
+ indexInput.seek(startPointer + skip);
+
+ long block = indexInput.readLong();
+ final int offsetInBlock = index % valuesPerBlock;
+ return (block >>> (offsetInBlock * bitsPerValue)) & mask;
+ } catch (IOException e) {
+ throw new IllegalStateException("failed", e);
+ }
+ }
+
+ @Override
+ public long ramBytesUsed() {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-store/src/main/java/org/apache/lucene/util/packed/DirectPackedReader.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/lucene/util/packed/DirectPackedReader.java b/blur-store/src/main/java/org/apache/lucene/util/packed/DirectPackedReader.java
new file mode 100644
index 0000000..9483a10
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/lucene/util/packed/DirectPackedReader.java
@@ -0,0 +1,80 @@
+package org.apache.lucene.util.packed;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.lucene.store.IndexInput;
+
+/* Reads directly from disk on each get */
+class DirectPackedReader extends PackedInts.ReaderImpl {
+ private final long startPointer;
+ private final ThreadLocal<IndexInput> in;
+
+ public DirectPackedReader(int bitsPerValue, int valueCount, IndexInput input) {
+ super(valueCount, bitsPerValue);
+ this.in = new ThreadLocal<IndexInput>() {
+ @Override
+ protected IndexInput initialValue() {
+ return input.clone();
+ }
+ };
+ startPointer = input.getFilePointer();
+ }
+
+ @Override
+ public long get(int index) {
+ final long majorBitPos = (long)index * bitsPerValue;
+ final long elementPos = majorBitPos >>> 3;
+ try {
+ IndexInput indexInput = in.get();
+ indexInput .seek(startPointer + elementPos);
+
+ final byte b0 = indexInput.readByte();
+ final int bitPos = (int) (majorBitPos & 7);
+ if (bitPos + bitsPerValue <= 8) {
+ // special case: all bits are in the first byte
+ return (b0 & ((1L << (8 - bitPos)) - 1)) >>> (8 - bitPos - bitsPerValue);
+ }
+
+ // take bits from the first byte
+ int remainingBits = bitsPerValue - 8 + bitPos;
+ long result = (b0 & ((1L << (8 - bitPos)) - 1)) << remainingBits;
+
+ // add bits from inner bytes
+ while (remainingBits >= 8) {
+ remainingBits -= 8;
+ result |= (indexInput.readByte() & 0xFFL) << remainingBits;
+ }
+
+ // take bits from the last byte
+ if (remainingBits > 0) {
+ result |= (indexInput.readByte() & 0xFFL) >>> (8 - remainingBits);
+ }
+
+ return result;
+ } catch (IOException ioe) {
+ throw new IllegalStateException("failed", ioe);
+ }
+ }
+
+ @Override
+ public long ramBytesUsed() {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
index 472481b..509c0f0 100644
--- a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
+++ b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
@@ -202,8 +202,11 @@ public class BlurConstants {
public static final String SHARED_MERGE_SCHEDULER_PREFIX = "shared-merge-scheduler";
public static final String BLUR_FILTER_ALIAS = "blur.filter.alias.";
-
+
public static final String HADOOP_CONF = "hadoop_conf.";
+ public static final String UPDATE_ROW = "_update_row_";
+ public static final String NEW_ROW = "_new_row_";
+ public static final String INTERNAL = "blur.internal";
static {
try {
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/deploy.sh
----------------------------------------------------------------------
diff --git a/deploy.sh b/deploy.sh
new file mode 100644
index 0000000..2731a85
--- /dev/null
+++ b/deploy.sh
@@ -0,0 +1,62 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+URL="<URL HERE>"
+REPO_ID="snapshots"
+
+#mvn install -D${PROFILE} -DskipTests
+#[ $? -eq 0 ] || exit $?;
+
+CUR_DIR=`pwd`
+for FILE in *; do
+ if [ -d $FILE ]
+ then
+ if [ -f $FILE/pom.xml ]
+ then
+ echo "#######################################"
+ echo "# Deploying $FILE"
+ echo "#######################################"
+
+ cd $FILE
+
+ VERSION=`mvn help:evaluate -Dexpression=project.version | grep -v "\[INFO\]" | grep -v "\[WARNING\]"`
+ ARTIFACT=`mvn help:evaluate -Dexpression=project.artifactId | grep -v "\[INFO\]" | grep -v "\[WARNING\]"`
+
+ JAR="target/${ARTIFACT}-${VERSION}.jar"
+ JAR_SOURCES="target/${ARTIFACT}-${VERSION}-sources.jar"
+ TESTS_JAR="target/${ARTIFACT}-${VERSION}-tests.jar"
+ if [ -f $JAR ]
+ then
+ if [ -f target/effective-pom.xml ]
+ then
+ echo "Args PWD=$PWD REPO_ID=${REPO_ID} URL=${URL} ARTIFACT=${ARTIFACT} VERSION=${VERSION}"
+ if [ -f $TESTS_JAR ]
+ then
+ mvn deploy:deploy-file -DrepositoryId=${REPO_ID} -Durl=${URL} -Dfile=$JAR -DpomFile=target/effective-pom.xml -Dtypes=jar -Dclassifiers=tests -Dfiles=$TESTS_JAR -Dsources=$JAR_SOURCES
+ else
+ mvn deploy:deploy-file -DrepositoryId=${REPO_ID} -Durl=${URL} -Dfile=$JAR -DpomFile=target/effective-pom.xml
+ fi
+ [ $? -eq 0 ] || exit $?;
+ else
+ echo "No effective-pom.xml to deploy, SKIPPING."
+ fi
+ fi
+ cd $CUR_DIR
+ fi
+ fi
+done
+