You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by je...@apache.org on 2014/02/08 08:43:00 UTC
[11/11] git commit: Port Phoenix to Hbase0.98
Port Phoenix to Hbase0.98
Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/53f7d3ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/53f7d3ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/53f7d3ce
Branch: refs/heads/4.0.0
Commit: 53f7d3ce880f79b18a5a728be13f965e84c52e56
Parents: 214ad9e
Author: Jeffrey Zhong <jz...@JZhongs-MacBook-Pro.local>
Authored: Fri Feb 7 23:41:57 2014 -0800
Committer: Jeffrey Zhong <jz...@JZhongs-MacBook-Pro.local>
Committed: Fri Feb 7 23:43:34 2014 -0800
----------------------------------------------------------------------
phoenix-core/pom.xml | 121 +-
.../hbase/index/IndexLogRollSynchronizer.java | 14 +-
.../org/apache/hadoop/hbase/index/Indexer.java | 95 +-
.../hbase/index/builder/BaseIndexBuilder.java | 4 +-
.../hbase/index/builder/IndexBuildManager.java | 6 +-
.../hbase/index/builder/IndexBuilder.java | 4 +-
.../covered/CoveredColumnsIndexBuilder.java | 21 +-
.../hbase/index/covered/LocalTableState.java | 15 +-
.../hbase/index/covered/data/IndexMemStore.java | 18 +-
.../hbase/index/covered/data/LocalTable.java | 5 +-
.../example/CoveredColumnIndexCodec.java | 1 -
.../covered/example/CoveredColumnIndexer.java | 11 +-
.../filter/ApplyAndFilterDeletesFilter.java | 36 +-
...olumnTrackingNextLargestTimestampFilter.java | 16 +-
.../index/covered/filter/FamilyOnlyFilter.java | 8 +-
.../covered/filter/MaxTimestampFilter.java | 22 +-
.../covered/filter/NewerTimestampFilter.java | 12 +-
.../covered/update/IndexUpdateManager.java | 11 +-
.../hbase/index/parallel/ThreadPoolManager.java | 1 -
.../index/scanner/FilteredKeyValueScanner.java | 2 -
.../index/table/CoprocessorHTableFactory.java | 3 +-
.../hbase/index/util/IndexManagementUtil.java | 7 +-
.../hadoop/hbase/index/wal/IndexedKeyValue.java | 73 +-
.../hadoop/hbase/index/wal/KeyValueCodec.java | 11 +-
.../regionserver/wal/IndexedHLogReader.java | 119 +-
.../hbase/regionserver/wal/IndexedWALEdit.java | 91 -
.../regionserver/wal/IndexedWALEditCodec.java | 40 +-
.../apache/phoenix/cache/ServerCacheClient.java | 73 +-
.../cache/aggcache/SpillableGroupByCache.java | 8 +-
.../apache/phoenix/client/ClientKeyValue.java | 133 +-
.../apache/phoenix/compile/DeleteCompiler.java | 5 +-
.../apache/phoenix/compile/UpsertCompiler.java | 5 +-
.../phoenix/coprocessor/BaseRegionScanner.java | 17 +-
.../GroupedAggregateRegionObserver.java | 26 +-
.../coprocessor/HashJoinRegionScanner.java | 47 +-
.../coprocessor/MetaDataEndpointImpl.java | 1352 ++--
.../phoenix/coprocessor/MetaDataProtocol.java | 167 +-
.../phoenix/coprocessor/ScanRegionObserver.java | 47 +-
.../coprocessor/SequenceRegionObserver.java | 86 +-
.../coprocessor/ServerCachingEndpointImpl.java | 98 +-
.../coprocessor/ServerCachingProtocol.java | 3 +-
.../UngroupedAggregateRegionObserver.java | 95 +-
.../coprocessor/generated/MetaDataProtos.java | 7135 ++++++++++++++++++
.../coprocessor/generated/PTableProtos.java | 5315 +++++++++++++
.../generated/ServerCacheFactoryProtos.java | 568 ++
.../generated/ServerCachingProtos.java | 3447 +++++++++
.../apache/phoenix/execute/MutationState.java | 10 +-
.../expression/KeyValueColumnExpression.java | 1 -
.../DistinctValueWithCountServerAggregator.java | 4 +-
.../phoenix/filter/BooleanExpressionFilter.java | 11 +-
.../MultiCFCQKeyValueComparisonFilter.java | 11 +
.../filter/MultiCQKeyValueComparisonFilter.java | 12 +
.../filter/MultiKeyValueComparisonFilter.java | 8 +-
.../phoenix/filter/RowKeyComparisonFilter.java | 15 +-
.../SingleCFCQKeyValueComparisonFilter.java | 13 +-
.../SingleCQKeyValueComparisonFilter.java | 11 +
.../filter/SingleKeyValueComparisonFilter.java | 11 +-
.../apache/phoenix/filter/SkipScanFilter.java | 25 +-
.../apache/phoenix/index/IndexMaintainer.java | 41 +-
.../phoenix/index/PhoenixIndexBuilder.java | 10 +-
.../index/PhoenixIndexFailurePolicy.java | 44 +-
.../iterate/MappedByteBufferSortedQueue.java | 5 +-
.../iterate/RegionScannerResultIterator.java | 6 +-
.../phoenix/iterate/SpoolingResultIterator.java | 4 +-
.../java/org/apache/phoenix/job/JobManager.java | 10 +-
.../apache/phoenix/join/HashCacheFactory.java | 2 +-
.../org/apache/phoenix/join/ScanProjector.java | 5 +-
.../phoenix/map/reduce/CSVBulkLoader.java | 3 -
.../apache/phoenix/optimize/QueryOptimizer.java | 2 +-
.../apache/phoenix/protobuf/ProtobufUtil.java | 133 +
.../query/ConnectionQueryServicesImpl.java | 260 +-
.../query/ConnectionlessQueryServicesImpl.java | 12 +-
.../phoenix/query/HConnectionFactory.java | 7 +-
.../phoenix/query/QueryServicesOptions.java | 4 +-
.../apache/phoenix/schema/DelegateColumn.java | 10 -
.../apache/phoenix/schema/MetaDataClient.java | 4 +-
.../java/org/apache/phoenix/schema/PColumn.java | 4 +-
.../org/apache/phoenix/schema/PColumnImpl.java | 98 +-
.../java/org/apache/phoenix/schema/PTable.java | 2 +-
.../org/apache/phoenix/schema/PTableImpl.java | 286 +-
.../org/apache/phoenix/schema/Sequence.java | 73 +-
.../apache/phoenix/schema/stat/PTableStats.java | 3 +-
.../phoenix/schema/stat/PTableStatsImpl.java | 20 +-
.../schema/tuple/MultiKeyValueTuple.java | 19 +-
.../org/apache/phoenix/schema/tuple/Tuple.java | 6 +-
.../java/org/apache/phoenix/util/CSVLoader.java | 1 -
.../java/org/apache/phoenix/util/IndexUtil.java | 22 +-
.../org/apache/phoenix/util/KeyValueUtil.java | 39 +-
.../org/apache/phoenix/util/MetaDataUtil.java | 23 +-
.../org/apache/phoenix/util/PhoenixRuntime.java | 7 +-
.../org/apache/phoenix/util/ResultUtil.java | 29 +-
.../org/apache/phoenix/util/SchemaUtil.java | 2 -
.../org/apache/phoenix/util/ServerUtil.java | 16 +-
.../java/org/apache/phoenix/util/TupleUtil.java | 13 +-
.../hadoop/hbase/index/IndexTestingUtils.java | 4 +-
.../TestFailForUnsupportedHBaseVersions.java | 2 +
.../TestEndToEndCoveredColumnsIndexBuilder.java | 26 +-
.../index/covered/TestLocalTableState.java | 13 +-
.../index/covered/data/TestIndexMemStore.java | 4 +-
.../example/TestCoveredColumnIndexCodec.java | 16 +-
.../example/TestEndToEndCoveredIndexing.java | 2 +
.../TestEndtoEndIndexingWithCompression.java | 7 +-
.../covered/example/TestFailWithoutRetries.java | 2 +
.../filter/TestApplyAndFilterDeletesFilter.java | 8 +-
.../index/util/TestIndexManagementUtil.java | 6 +-
.../index/write/TestWALRecoveryCaching.java | 7 +-
.../recovery/TestPerRegionIndexWriteCache.java | 87 +-
.../wal/TestReadWriteKeyValuesWithCodec.java | 48 +-
...ALReplayWithIndexWritesAndCompressedWAL.java | 28 +-
...exWritesAndUncompressedWALInHBase_094_9.java | 2 +
.../phoenix/client/TestClientKeyValueLocal.java | 5 +-
.../apache/phoenix/end2end/AlterTableTest.java | 4 -
.../phoenix/end2end/NativeHBaseTypesTest.java | 2 +-
.../phoenix/end2end/index/IndexTestUtil.java | 14 +-
.../phoenix/filter/SkipScanFilterTest.java | 11 +-
.../iterate/AggregateResultScannerTest.java | 7 -
.../java/org/apache/phoenix/query/BaseTest.java | 2 +
.../java/org/apache/phoenix/util/TestUtil.java | 36 +-
phoenix-flume/pom.xml | 47 +-
phoenix-protocol/README.txt | 10 +
phoenix-protocol/src/main/MetaDataService.proto | 118 +
phoenix-protocol/src/main/PTable.proto | 72 +
.../src/main/ServerCacheFactory.proto | 29 +
.../src/main/ServerCachingService.proto | 61 +
phoenix-protocol/src/main/build-proto.sh | 37 +
pom.xml | 111 +-
126 files changed, 19574 insertions(+), 2005 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 0e2b8e5..5cc7804 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -213,23 +213,11 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase</artifactId>
- </dependency>
<!-- Findbugs Annotation -->
<dependency>
<groupId>net.sourceforge.findbugs</groupId>
<artifactId>annotations</artifactId>
</dependency>
-
- <!-- Test Dependencies -->
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase</artifactId>
- <type>test-jar</type>
- </dependency>
- <!-- Needed by HBase to run the minicluster -->
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
@@ -249,12 +237,37 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf-java.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.0.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jruby</groupId>
+ <artifactId>jruby-complete</artifactId>
+ <version>${jruby.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+
</dependencies>
<profiles>
@@ -268,14 +281,60 @@
</property>
</activation>
<dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-test</artifactId>
- </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-testing-util</artifactId>
+ <version>${hbase-hadoop1.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${hbase-hadoop1.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-protocol</artifactId>
+ <version>${hbase-hadoop1.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase-hadoop1.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>${hadoop-one.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>hsqldb</groupId>
+ <artifactId>hsqldb</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.sf.kosmosfs</groupId>
+ <artifactId>kfs</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jdt</groupId>
+ <artifactId>core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.java.dev.jets3t</groupId>
+ <artifactId>jets3t</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>oro</groupId>
+ <artifactId>oro</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-test</artifactId>
+ <version>${hadoop-one.version}</version>
+ <optional>true</optional>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</profile>
@@ -290,6 +349,26 @@
</activation>
<dependencies>
<dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-testing-util</artifactId>
+ <version>${hbase-hadoop2.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${hbase-hadoop2.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-protocol</artifactId>
+ <version>${hbase-hadoop2.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase-hadoop2.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/IndexLogRollSynchronizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/IndexLogRollSynchronizer.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/IndexLogRollSynchronizer.java
index ca61221..904612f 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/IndexLogRollSynchronizer.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/IndexLogRollSynchronizer.java
@@ -71,6 +71,7 @@ public class IndexLogRollSynchronizer implements WALActionsListener {
private static final Log LOG = LogFactory.getLog(IndexLogRollSynchronizer.class);
private WriteLock logArchiveLock;
+ private boolean lockAcquired = false;
public IndexLogRollSynchronizer(WriteLock logWriteLock){
this.logArchiveLock = logWriteLock;
@@ -81,12 +82,21 @@ public class IndexLogRollSynchronizer implements WALActionsListener {
public void preLogArchive(Path oldPath, Path newPath) throws IOException {
//take a write lock on the index - any pending index updates will complete before we finish
LOG.debug("Taking INDEX_UPDATE writelock");
- logArchiveLock.lock();
- LOG.debug("Got the INDEX_UPDATE writelock");
+ try {
+ logArchiveLock.lockInterruptibly();
+ lockAcquired = true;
+ } catch (InterruptedException e) {
+ LOG.info("Acquiring lock got interrupted!");
+ Thread.currentThread().interrupt();
+ }
+ if (lockAcquired) {
+ LOG.debug("Got the INDEX_UPDATE writelock");
+ }
}
@Override
public void postLogArchive(Path oldPath, Path newPath) throws IOException {
+ if (!lockAcquired) return;
// done archiving the logs, any WAL updates will be replayed on failure
LOG.debug("Releasing INDEX_UPDATE writelock");
logArchiveLock.unlock();
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/Indexer.java
index fe2852b..aa9df58 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/Indexer.java
@@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
@@ -167,7 +168,7 @@ public class Indexer extends BaseRegionObserver {
this.builder = new IndexBuildManager(env);
// get a reference to the WAL
- log = env.getRegionServerServices().getWAL();
+ log = env.getRegionServerServices().getWAL(null);
// add a synchronizer so we don't archive a WAL that we need
log.registerWALActionsListener(new IndexLogRollSynchronizer(INDEX_READ_WRITE_LOCK.writeLock()));
@@ -218,9 +219,9 @@ public class Indexer extends BaseRegionObserver {
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
- final WALEdit edit, final boolean writeToWAL) throws IOException {
+ final WALEdit edit, final Durability durability) throws IOException {
if (this.disabled) {
- super.prePut(c, put, edit, writeToWAL);
+ super.prePut(c, put, edit, durability);
return;
}
// just have to add a batch marker to the WALEdit so we get the edit again in the batch
@@ -230,13 +231,13 @@ public class Indexer extends BaseRegionObserver {
@Override
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
- WALEdit edit, boolean writeToWAL) throws IOException {
+ WALEdit edit, final Durability durability) throws IOException {
if (this.disabled) {
- super.preDelete(e, delete, edit, writeToWAL);
+ super.preDelete(e, delete, edit, durability);
return;
}
try {
- preDeleteWithExceptions(e, delete, edit, writeToWAL);
+ preDeleteWithExceptions(e, delete, edit, durability);
return;
} catch (Throwable t) {
rethrowIndexingException(t);
@@ -246,7 +247,7 @@ public class Indexer extends BaseRegionObserver {
}
public void preDeleteWithExceptions(ObserverContext<RegionCoprocessorEnvironment> e,
- Delete delete, WALEdit edit, boolean writeToWAL) throws Exception {
+ Delete delete, WALEdit edit, final Durability durability) throws Exception {
// if we are making the update as part of a batch, we need to add in a batch marker so the WAL
// is retained
if (this.builder.getBatchId(delete) != null) {
@@ -257,14 +258,14 @@ public class Indexer extends BaseRegionObserver {
// get the mapping for index column -> target index table
Collection<Pair<Mutation, byte[]>> indexUpdates = this.builder.getIndexUpdate(delete);
- if (doPre(indexUpdates, edit, writeToWAL)) {
+ if (doPre(indexUpdates, edit, durability)) {
takeUpdateLock("delete");
}
}
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
- MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+ MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
if (this.disabled) {
super.preBatchMutate(c, miniBatchOp);
return;
@@ -281,12 +282,13 @@ public class Indexer extends BaseRegionObserver {
@SuppressWarnings("deprecation")
public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
- MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws Throwable {
+ MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
// first group all the updates for a single row into a single update to be processed
Map<ImmutableBytesPtr, MultiMutation> mutations =
new HashMap<ImmutableBytesPtr, MultiMutation>();
- boolean durable = false;
+
+ Durability durability = Durability.SKIP_WAL;
for (int i = 0; i < miniBatchOp.size(); i++) {
// remove the batch keyvalue marker - its added for all puts
WALEdit edit = miniBatchOp.getWalEdit(i);
@@ -294,11 +296,13 @@ public class Indexer extends BaseRegionObserver {
// we could check is indexing is enable for the mutation in prePut and then just skip this
// after checking here, but this saves us the checking again.
if (edit != null) {
- KeyValue kv = edit.getKeyValues().remove(0);
- assert kv == BATCH_MARKER : "Expected batch marker from the WALEdit, but got: " + kv;
+ KeyValue kv = edit.getKeyValues().get(0);
+ if (kv == BATCH_MARKER) {
+ // remove batch marker from the WALEdit
+ edit.getKeyValues().remove(0);
+ }
}
- Pair<Mutation, Integer> op = miniBatchOp.getOperation(i);
- Mutation m = op.getFirst();
+ Mutation m = miniBatchOp.getOperation(i);
// skip this mutation if we aren't enabling indexing
// unfortunately, we really should ask if the raw mutation (rather than the combined mutation)
// should be indexed, which means we need to expose another method on the builder. Such is the
@@ -308,8 +312,8 @@ public class Indexer extends BaseRegionObserver {
}
// figure out if this is batch is durable or not
- if(!durable){
- durable = m.getDurability() != Durability.SKIP_WAL;
+ if (m.getDurability().ordinal() > durability.ordinal()) {
+ durability = m.getDurability();
}
// add the mutation to the batch set
@@ -317,7 +321,7 @@ public class Indexer extends BaseRegionObserver {
MultiMutation stored = mutations.get(row);
// we haven't seen this row before, so add it
if (stored == null) {
- stored = new MultiMutation(row, m.getWriteToWAL());
+ stored = new MultiMutation(row);
mutations.put(row, stored);
}
stored.addAll(m);
@@ -336,7 +340,7 @@ public class Indexer extends BaseRegionObserver {
Collection<Pair<Mutation, byte[]>> indexUpdates =
this.builder.getIndexUpdate(miniBatchOp, mutations.values());
// write them
- if (doPre(indexUpdates, edit, durable)) {
+ if (doPre(indexUpdates, edit, durability)) {
takeUpdateLock("batch mutation");
}
}
@@ -372,9 +376,8 @@ public class Indexer extends BaseRegionObserver {
private ImmutableBytesPtr rowKey;
- public MultiMutation(ImmutableBytesPtr rowkey, boolean writeToWal) {
+ public MultiMutation(ImmutableBytesPtr rowkey) {
this.rowKey = rowkey;
- this.writeToWAL = writeToWal;
}
/**
@@ -383,9 +386,9 @@ public class Indexer extends BaseRegionObserver {
@SuppressWarnings("deprecation")
public void addAll(Mutation stored) {
// add all the kvs
- for (Entry<byte[], List<KeyValue>> kvs : stored.getFamilyMap().entrySet()) {
+ for (Entry<byte[], List<Cell>> kvs : stored.getFamilyCellMap().entrySet()) {
byte[] family = kvs.getKey();
- List<KeyValue> list = getKeyValueList(family, kvs.getValue().size());
+ List<Cell> list = getKeyValueList(family, kvs.getValue().size());
list.addAll(kvs.getValue());
familyMap.put(family, list);
}
@@ -396,15 +399,12 @@ public class Indexer extends BaseRegionObserver {
this.setAttribute(attrib.getKey(), attrib.getValue());
}
}
- if (stored.getWriteToWAL()) {
- this.writeToWAL = true;
- }
}
- private List<KeyValue> getKeyValueList(byte[] family, int hint) {
- List<KeyValue> list = familyMap.get(family);
+ private List<Cell> getKeyValueList(byte[] family, int hint) {
+ List<Cell> list = familyMap.get(family);
if (list == null) {
- list = new ArrayList<KeyValue>(hint);
+ list = new ArrayList<Cell>(hint);
}
return list;
}
@@ -423,16 +423,6 @@ public class Indexer extends BaseRegionObserver {
public boolean equals(Object o) {
return o == null ? false : o.hashCode() == this.hashCode();
}
-
- @Override
- public void readFields(DataInput arg0) throws IOException {
- throw new UnsupportedOperationException("MultiMutations cannot be read/written");
- }
-
- @Override
- public void write(DataOutput arg0) throws IOException {
- throw new UnsupportedOperationException("MultiMutations cannot be read/written");
- }
}
/**
@@ -441,7 +431,7 @@ public class Indexer extends BaseRegionObserver {
* @throws IOException
*/
private boolean doPre(Collection<Pair<Mutation, byte[]>> indexUpdates, final WALEdit edit,
- final boolean writeToWAL) throws IOException {
+ final Durability durability) throws IOException {
// no index updates, so we are done
if (indexUpdates == null || indexUpdates.size() == 0) {
return false;
@@ -449,7 +439,7 @@ public class Indexer extends BaseRegionObserver {
// if writing to wal is disabled, we never see the WALEdit updates down the way, so do the index
// update right away
- if (!writeToWAL) {
+ if (durability == Durability.SKIP_WAL) {
try {
this.writer.write(indexUpdates);
return false;
@@ -469,27 +459,27 @@ public class Indexer extends BaseRegionObserver {
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
- boolean writeToWAL) throws IOException {
+ final Durability durability) throws IOException {
if (this.disabled) {
- super.postPut(e, put, edit, writeToWAL);
+ super.postPut(e, put, edit, durability);
return;
}
- doPost(edit, put, writeToWAL);
+ doPost(edit, put, durability);
}
@Override
public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
- WALEdit edit, boolean writeToWAL) throws IOException {
+ WALEdit edit, final Durability durability) throws IOException {
if (this.disabled) {
- super.postDelete(e, delete, edit, writeToWAL);
+ super.postDelete(e, delete, edit, durability);
return;
}
- doPost(edit,delete, writeToWAL);
+ doPost(edit, delete, durability);
}
@Override
public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
- MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+ MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
if (this.disabled) {
super.postBatchMutate(c, miniBatchOp);
return;
@@ -498,9 +488,9 @@ public class Indexer extends BaseRegionObserver {
// noop for the rest of the indexer - its handled by the first call to put/delete
}
- private void doPost(WALEdit edit, Mutation m, boolean writeToWAL) throws IOException {
+ private void doPost(WALEdit edit, Mutation m, final Durability durability) throws IOException {
try {
- doPostWithExceptions(edit, m, writeToWAL);
+ doPostWithExceptions(edit, m, durability);
return;
} catch (Throwable e) {
rethrowIndexingException(e);
@@ -509,9 +499,10 @@ public class Indexer extends BaseRegionObserver {
"Somehow didn't complete the index update, but didn't return succesfully either!");
}
- private void doPostWithExceptions(WALEdit edit, Mutation m, boolean writeToWAL) throws Exception {
+ private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability)
+ throws Exception {
//short circuit, if we don't need to do any work
- if (!writeToWAL || !this.builder.isEnabled(m)) {
+ if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m)) {
// already did the index update in prePut, so we are done
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/BaseIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/BaseIndexBuilder.java
index bbeae31..8c9a777 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/BaseIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/BaseIndexBuilder.java
@@ -54,12 +54,12 @@ public abstract class BaseIndexBuilder implements IndexBuilder {
}
@Override
- public void batchStarted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+ public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
// noop
}
@Override
- public void batchCompleted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) {
+ public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp) {
// noop
}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuildManager.java
index 833f142..61fc90d 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuildManager.java
@@ -116,7 +116,7 @@ public class IndexBuildManager implements Stoppable {
public Collection<Pair<Mutation, byte[]>> getIndexUpdate(
- MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp,
Collection<? extends Mutation> mutations) throws Throwable {
// notify the delegate that we have started processing a batch
this.delegate.batchStarted(miniBatchOp);
@@ -178,11 +178,11 @@ public class IndexBuildManager implements Stoppable {
return delegate.getIndexUpdateForFilteredRows(filtered);
}
- public void batchCompleted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) {
+ public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp) {
delegate.batchCompleted(miniBatchOp);
}
- public void batchStarted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp)
+ public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp)
throws IOException {
delegate.batchStarted(miniBatchOp);
}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuilder.java
index e23ea3f..e92edab 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuilder.java
@@ -107,7 +107,7 @@ public interface IndexBuilder extends Stoppable {
* Notification that a batch of updates has successfully been written.
* @param miniBatchOp the full batch operation that was written
*/
- public void batchCompleted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp);
+ public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp);
/**
* Notification that a batch has been started.
@@ -118,7 +118,7 @@ public interface IndexBuilder extends Stoppable {
* @param miniBatchOp the full batch operation to be written
* @throws IOException
*/
- public void batchStarted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException;
+ public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException;
/**
* This allows the codec to dynamically change whether or not indexing should take place for a
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/CoveredColumnsIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/CoveredColumnsIndexBuilder.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/CoveredColumnsIndexBuilder.java
index 422a9ec..ce5efc5 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/CoveredColumnsIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/CoveredColumnsIndexBuilder.java
@@ -33,23 +33,25 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
-
-import com.google.common.collect.Lists;
-import com.google.common.primitives.Longs;
import org.apache.hadoop.hbase.index.builder.BaseIndexBuilder;
import org.apache.hadoop.hbase.index.covered.data.LocalHBaseState;
import org.apache.hadoop.hbase.index.covered.data.LocalTable;
import org.apache.hadoop.hbase.index.covered.update.ColumnTracker;
import org.apache.hadoop.hbase.index.covered.update.IndexUpdateManager;
import org.apache.hadoop.hbase.index.covered.update.IndexedColumnGroup;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
/**
* Build covered indexes for phoenix updates.
@@ -148,8 +150,9 @@ public class CoveredColumnsIndexBuilder extends BaseIndexBuilder {
*/
protected Collection<Batch> createTimestampBatchesFromMutation(Mutation m) {
Map<Long, Batch> batches = new HashMap<Long, Batch>();
- for (List<KeyValue> family : m.getFamilyMap().values()) {
- createTimestampBatchesFromKeyValues(family, batches);
+ for (List<Cell> family : m.getFamilyCellMap().values()) {
+ List<KeyValue> familyKVs = KeyValueUtil.ensureKeyValues(family);
+ createTimestampBatchesFromKeyValues(familyKVs, batches);
}
// sort the batches
List<Batch> sorted = new ArrayList<Batch>(batches.values());
@@ -420,7 +423,7 @@ public class CoveredColumnsIndexBuilder extends BaseIndexBuilder {
// We have to figure out which kind of delete it is, since we need to do different things if its
// a general (row) delete, versus a delete of just a single column or family
- Map<byte[], List<KeyValue>> families = d.getFamilyMap();
+ Map<byte[], List<Cell>> families = d.getFamilyCellMap();
/*
* Option 1: its a row delete marker, so we just need to delete the most recent state for each
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/LocalTableState.java
index f6419f2..ce21135 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/LocalTableState.java
@@ -28,13 +28,12 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
-import org.apache.hadoop.hbase.util.Pair;
-
import org.apache.hadoop.hbase.index.covered.data.IndexMemStore;
import org.apache.hadoop.hbase.index.covered.data.LocalHBaseState;
import org.apache.hadoop.hbase.index.covered.update.ColumnReference;
@@ -42,6 +41,8 @@ import org.apache.hadoop.hbase.index.covered.update.ColumnTracker;
import org.apache.hadoop.hbase.index.covered.update.IndexedColumnGroup;
import org.apache.hadoop.hbase.index.scanner.Scanner;
import org.apache.hadoop.hbase.index.scanner.ScannerBuilder;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.util.Pair;
/**
* Manage the state of the HRegion's view of the table, for the single row.
@@ -175,7 +176,7 @@ public class LocalTableState implements TableState {
public Result getCurrentRowState() {
KeyValueScanner scanner = this.memstore.getScanner();
- List<KeyValue> kvs = new ArrayList<KeyValue>();
+ List<Cell> kvs = new ArrayList<Cell>();
while (scanner.peek() != null) {
try {
kvs.add(scanner.next());
@@ -184,7 +185,7 @@ public class LocalTableState implements TableState {
throw new RuntimeException("Local MemStore threw IOException!");
}
}
- return new Result(kvs);
+ return Result.create(kvs);
}
/**
@@ -192,8 +193,8 @@ public class LocalTableState implements TableState {
* @param pendingUpdate update to apply
*/
public void addUpdateForTesting(Mutation pendingUpdate) {
- for (Map.Entry<byte[], List<KeyValue>> e : pendingUpdate.getFamilyMap().entrySet()) {
- List<KeyValue> edits = e.getValue();
+ for (Map.Entry<byte[], List<Cell>> e : pendingUpdate.getFamilyCellMap().entrySet()) {
+ List<KeyValue> edits = KeyValueUtil.ensureKeyValues(e.getValue());
addUpdate(edits);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/IndexMemStore.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/IndexMemStore.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/IndexMemStore.java
index e2cac10..5cee11a 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/IndexMemStore.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/IndexMemStore.java
@@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.index.covered.data;
+import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator;
import java.util.SortedSet;
@@ -26,7 +27,7 @@ import java.util.SortedSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.KeyComparator;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.IndexKeyValueSkipListSet;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
@@ -87,12 +88,12 @@ public class IndexMemStore implements KeyValueStore {
*/
public static final Comparator<KeyValue> COMPARATOR = new Comparator<KeyValue>() {
- private final KeyComparator rawcomparator = new KeyComparator();
+ private final KVComparator rawcomparator = new KVComparator();
@Override
public int compare(final KeyValue left, final KeyValue right) {
- return rawcomparator.compare(left.getBuffer(), left.getOffset() + KeyValue.ROW_OFFSET,
- left.getKeyLength(), right.getBuffer(), right.getOffset() + KeyValue.ROW_OFFSET,
+ return rawcomparator.compareFlatKey(left.getRowArray(), left.getOffset() + KeyValue.ROW_OFFSET,
+ left.getKeyLength(), right.getRowArray(), right.getOffset() + KeyValue.ROW_OFFSET,
right.getKeyLength());
}
};
@@ -140,7 +141,8 @@ public class IndexMemStore implements KeyValueStore {
}
private String toString(KeyValue kv) {
- return kv.toString() + "/value=" + Bytes.toString(kv.getValue());
+ return kv.toString() + "/value=" +
+ Bytes.toString(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
}
@Override
@@ -159,7 +161,7 @@ public class IndexMemStore implements KeyValueStore {
public KeyValueScanner getScanner() {
return new MemStoreScanner();
}
-
+
/*
* MemStoreScanner implements the KeyValueScanner. It lets the caller scan the contents of a
* memstore -- both current map and snapshot. This behaves as if it were a real scanner but does
@@ -306,14 +308,13 @@ public class IndexMemStore implements KeyValueStore {
public long getSequenceID() {
return Long.MAX_VALUE;
}
-
+
@Override
public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns, long oldestUnexpiredTS) {
throw new UnsupportedOperationException(this.getClass().getName()
+ " doesn't support checking to see if it should use a scanner!");
}
- /*
@Override
public boolean backwardSeek(KeyValue arg0) throws IOException {
throw new UnsupportedOperationException();
@@ -328,6 +329,5 @@ public class IndexMemStore implements KeyValueStore {
public boolean seekToPreviousRow(KeyValue arg0) throws IOException {
throw new UnsupportedOperationException();
}
- */
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/LocalTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/LocalTable.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/LocalTable.java
index 52aa851..d2d99e6 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/LocalTable.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/LocalTable.java
@@ -25,6 +25,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
@@ -63,11 +64,11 @@ public class LocalTable implements LocalHBaseState {
s.setStopRow(row);
HRegion region = this.env.getRegion();
RegionScanner scanner = region.getScanner(s);
- List<KeyValue> kvs = new ArrayList<KeyValue>(1);
+ List<Cell> kvs = new ArrayList<Cell>(1);
boolean more = scanner.next(kvs);
assert !more : "Got more than one result when scanning" + " a single row in the primary table!";
- Result r = new Result(kvs);
+ Result r = Result.create(kvs);
scanner.close();
return r;
}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexCodec.java
index 8f0ee99..c588e95 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@ -244,7 +244,6 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
* @param pk primary key of the original row
* @param length total number of bytes of all the values that should be added
* @param values to use when building the key
- * @return
*/
static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> values) {
// now build up expected row key, each of the values, in order, followed by the PK and then some
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexer.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexer.java
index c7019c4..0ec3f96 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexer.java
@@ -14,13 +14,12 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-
import org.apache.hadoop.hbase.index.covered.Batch;
import org.apache.hadoop.hbase.index.covered.CoveredColumnsIndexBuilder;
import org.apache.hadoop.hbase.index.covered.LocalTableState;
import org.apache.hadoop.hbase.index.covered.update.IndexUpdateManager;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
/**
* Index maintainer that maintains multiple indexes based on '{@link ColumnGroup}s'. Each group is a
@@ -108,10 +107,11 @@ public class CoveredColumnIndexer extends CoveredColumnsIndexBuilder {
Collection<Batch> batches = batchByRow(filtered);
for (Batch batch : batches) {
- Put p = new Put(batch.getKvs().iterator().next().getRow());
+ KeyValue curKV = batch.getKvs().iterator().next();
+ Put p = new Put(curKV.getRowArray(), curKV.getRowOffset(), curKV.getRowLength());
for (KeyValue kv : batch.getKvs()) {
// we only need to cleanup Put entries
- byte type = kv.getType();
+ byte type = kv.getTypeByte();
Type t = KeyValue.Type.codeToType(type);
if (!t.equals(Type.Put)) {
continue;
@@ -136,7 +136,6 @@ public class CoveredColumnIndexer extends CoveredColumnsIndexBuilder {
/**
* @param filtered
- * @return
*/
private Collection<Batch> batchByRow(Collection<KeyValue> filtered) {
Map<Long, Batch> batches = new HashMap<Long, Batch>();
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java
index ebd2abe..658e981 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java
@@ -27,9 +27,12 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.filter.FilterBase;
@@ -105,13 +108,14 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
}
@Override
- public ReturnCode filterKeyValue(KeyValue next) {
+ public ReturnCode filterKeyValue(Cell next) {
// we marked ourselves done, but the END_ROW_KEY didn't manage to seek to the very last key
if (this.done) {
return ReturnCode.SKIP;
}
- switch (KeyValue.Type.codeToType(next.getType())) {
+ KeyValue nextKV = KeyValueUtil.ensureKeyValue(next);
+ switch (KeyValue.Type.codeToType(next.getTypeByte())) {
/*
* DeleteFamily will always sort first because those KVs (we assume) don't have qualifiers (or
* rather are null). Therefore, we have to keep a hold of all the delete families until we get
@@ -123,20 +127,20 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
// one. In fact, it means that all the previous deletes can be ignored because the family must
// not match anymore.
this.coveringDelete.reset();
- this.coveringDelete.deleteFamily = next;
+ this.coveringDelete.deleteFamily = nextKV;
return ReturnCode.SKIP;
case DeleteColumn:
// similar to deleteFamily, all the newer deletes/puts would have been seen at this point, so
// we can safely replace the more recent delete column with the more recent one
this.coveringDelete.pointDelete = null;
- this.coveringDelete.deleteColumn = next;
+ this.coveringDelete.deleteColumn = nextKV;
return ReturnCode.SKIP;
case Delete:
// we are just deleting the single column value at this point.
// therefore we just skip this entry and go onto the next one. The only caveat is that
// we should still cover the next entry if this delete applies to the next entry, so we
// have to keep around a reference to the KV to compare against the next valid entry
- this.coveringDelete.pointDelete = next;
+ this.coveringDelete.pointDelete = nextKV;
return ReturnCode.SKIP;
default:
// no covering deletes
@@ -144,18 +148,18 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
return ReturnCode.INCLUDE;
}
- if (coveringDelete.matchesFamily(next)) {
+ if (coveringDelete.matchesFamily(nextKV)) {
this.currentHint = familyHint;
return ReturnCode.SEEK_NEXT_USING_HINT;
}
- if (coveringDelete.matchesColumn(next)) {
+ if (coveringDelete.matchesColumn(nextKV)) {
// hint to the next column
this.currentHint = columnHint;
return ReturnCode.SEEK_NEXT_USING_HINT;
}
- if (coveringDelete.matchesPoint(next)) {
+ if (coveringDelete.matchesPoint(nextKV)) {
return ReturnCode.SKIP;
}
@@ -165,16 +169,6 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
return ReturnCode.INCLUDE;
}
- @Override
- public void write(DataOutput out) throws IOException {
- throw new UnsupportedOperationException("Server-side only filter, cannot be serialized!");
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- throw new UnsupportedOperationException("Server-side only filter, cannot be deserialized!");
- }
-
/**
* Get the next hint for a given peeked keyvalue
*/
@@ -247,7 +241,7 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
if (deleteFamily == null) {
return false;
}
- if (deleteFamily.matchingFamily(next)) {
+ if (CellUtil.matchingFamily(deleteFamily, next)) {
// falls within the timestamp range
if (deleteFamily.getTimestamp() >= next.getTimestamp()) {
return true;
@@ -269,7 +263,7 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
if (deleteColumn == null) {
return false;
}
- if (deleteColumn.matchingFamily(next) && deleteColumn.matchingQualifier(next)) {
+ if (CellUtil.matchingFamily(deleteColumn, next) && deleteColumn.matchingQualifier(next)) {
// falls within the timestamp range
if (deleteColumn.getTimestamp() >= next.getTimestamp()) {
return true;
@@ -289,7 +283,7 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
// that the timestamp matches exactly. Because we sort by timestamp first, either the next
// keyvalue has the exact timestamp or is an older (smaller) timestamp, and we can allow that
// one.
- if (pointDelete != null && pointDelete.matchingFamily(next)
+ if (pointDelete != null && CellUtil.matchingFamily(pointDelete, next)
&& pointDelete.matchingQualifier(next)) {
if (pointDelete.getTimestamp() == next.getTimestamp()) {
return true;
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java
index 72a10e1..494bf66 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java
@@ -19,10 +19,7 @@
*/
package org.apache.hadoop.hbase.index.covered.filter;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.filter.FilterBase;
@@ -53,7 +50,7 @@ public class ColumnTrackingNextLargestTimestampFilter extends FilterBase {
}
@Override
- public ReturnCode filterKeyValue(KeyValue v) {
+ public ReturnCode filterKeyValue(Cell v) {
long timestamp = v.getTimestamp();
if (timestamp > ts) {
this.column.setTs(timestamp);
@@ -62,13 +59,4 @@ public class ColumnTrackingNextLargestTimestampFilter extends FilterBase {
return ReturnCode.INCLUDE;
}
- @Override
- public void write(DataOutput out) throws IOException {
- throw new UnsupportedOperationException("Server-side only filter, cannot be serialized!");
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- throw new UnsupportedOperationException("Server-side only filter, cannot be deserialized!");
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/FamilyOnlyFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/FamilyOnlyFilter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/FamilyOnlyFilter.java
index 8591f88..7c35786 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/FamilyOnlyFilter.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/FamilyOnlyFilter.java
@@ -17,10 +17,10 @@
*/
package org.apache.hadoop.hbase.index.covered.filter;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.FamilyFilter;
-import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
/**
* Similar to the {@link FamilyFilter} but stops when the end of the family is reached and only
@@ -39,7 +39,7 @@ public class FamilyOnlyFilter extends FamilyFilter {
this(new BinaryComparator(family));
}
- public FamilyOnlyFilter(final WritableByteArrayComparable familyComparator) {
+ public FamilyOnlyFilter(final ByteArrayComparable familyComparator) {
super(CompareOp.EQUAL, familyComparator);
}
@@ -56,7 +56,7 @@ public class FamilyOnlyFilter extends FamilyFilter {
}
@Override
- public ReturnCode filterKeyValue(KeyValue v) {
+ public ReturnCode filterKeyValue(Cell v) {
if (done) {
return ReturnCode.SKIP;
}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/MaxTimestampFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/MaxTimestampFilter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/MaxTimestampFilter.java
index 846ec88..92e9daf 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/MaxTimestampFilter.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/MaxTimestampFilter.java
@@ -23,6 +23,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.util.Bytes;
@@ -44,7 +45,13 @@ public class MaxTimestampFilter extends FilterBase {
public KeyValue getNextKeyHint(KeyValue currentKV) {
// this might be a little excessive right now - better safe than sorry though, so we don't mess
// with other filters too much.
- KeyValue kv = currentKV.deepCopy();
+ KeyValue kv = null;
+ try {
+ kv = currentKV.clone();
+ } catch (CloneNotSupportedException e) {
+ // the exception should not happen at all
+ throw new IllegalArgumentException(e);
+ }
int offset =kv.getTimestampOffset();
//set the timestamp in the buffer
byte[] buffer = kv.getBuffer();
@@ -55,22 +62,11 @@ public class MaxTimestampFilter extends FilterBase {
}
@Override
- public ReturnCode filterKeyValue(KeyValue v) {
+ public ReturnCode filterKeyValue(Cell v) {
long timestamp = v.getTimestamp();
if (timestamp > ts) {
return ReturnCode.SEEK_NEXT_USING_HINT;
}
return ReturnCode.INCLUDE;
}
-
- @Override
- public void write(DataOutput out) throws IOException {
- throw new UnsupportedOperationException("Server-side only filter, cannot be serialized!");
-
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- throw new UnsupportedOperationException("Server-side only filter, cannot be deserialized!");
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/NewerTimestampFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/NewerTimestampFilter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/NewerTimestampFilter.java
index 560cdd8..8e0f617 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/NewerTimestampFilter.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/NewerTimestampFilter.java
@@ -4,6 +4,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.filter.FilterBase;
@@ -11,7 +12,6 @@ import org.apache.hadoop.hbase.filter.FilterBase;
* Server-side only class used in the indexer to filter out keyvalues newer than a given timestamp
* (so allows anything <code><=</code> timestamp through).
* <p>
- * Note,<tt>this</tt> doesn't support {@link #write(DataOutput)} or {@link #readFields(DataInput)}.
*/
public class NewerTimestampFilter extends FilterBase {
@@ -22,16 +22,8 @@ public class NewerTimestampFilter extends FilterBase {
}
@Override
- public ReturnCode filterKeyValue(KeyValue ignored) {
+ public ReturnCode filterKeyValue(Cell ignored) {
return ignored.getTimestamp() > timestamp ? ReturnCode.SKIP : ReturnCode.INCLUDE;
}
- @Override
- public void write(DataOutput out) throws IOException {
- throw new UnsupportedOperationException("TimestampFilter is server-side only!");
- }
- @Override
- public void readFields(DataInput in) throws IOException {
- throw new UnsupportedOperationException("TimestampFilter is server-side only!");
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/update/IndexUpdateManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/update/IndexUpdateManager.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/update/IndexUpdateManager.java
index 7ace79d..77b6e85 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/update/IndexUpdateManager.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/update/IndexUpdateManager.java
@@ -26,7 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
@@ -224,12 +224,13 @@ public class IndexUpdateManager {
+ ((m instanceof Put) ? m.getTimeStamp() + " " : ""));
sb.append(" row=" + Bytes.toString(m.getRow()));
sb.append("\n");
- if (m.getFamilyMap().isEmpty()) {
+ if (m.getFamilyCellMap().isEmpty()) {
sb.append("\t\t=== EMPTY ===\n");
}
- for (List<KeyValue> kvs : m.getFamilyMap().values()) {
- for (KeyValue kv : kvs) {
- sb.append("\t\t" + kv.toString() + "/value=" + Bytes.toStringBinary(kv.getValue()));
+ for (List<Cell> kvs : m.getFamilyCellMap().values()) {
+ for (Cell kv : kvs) {
+ sb.append("\t\t" + kv.toString() + "/value=" + Bytes.toStringBinary(kv.getValueArray(),
+ kv.getValueOffset(), kv.getValueLength()));
sb.append("\n");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/parallel/ThreadPoolManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/parallel/ThreadPoolManager.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/parallel/ThreadPoolManager.java
index c8afb04..2a343f0 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/parallel/ThreadPoolManager.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/parallel/ThreadPoolManager.java
@@ -74,7 +74,6 @@ public class ThreadPoolManager {
/**
* @param conf
- * @return
*/
private static ShutdownOnUnusedThreadPoolExecutor getDefaultExecutor(ThreadPoolBuilder builder) {
int maxThreads = builder.getMaxThreads();
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/scanner/FilteredKeyValueScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/scanner/FilteredKeyValueScanner.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/scanner/FilteredKeyValueScanner.java
index 0f7fed3..f3deb6a 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/scanner/FilteredKeyValueScanner.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/scanner/FilteredKeyValueScanner.java
@@ -125,7 +125,6 @@ public class FilteredKeyValueScanner implements KeyValueScanner {
this.delegate.close();
}
- /*
@Override
public boolean backwardSeek(KeyValue arg0) throws IOException {
return this.delegate.backwardSeek(arg0);
@@ -140,5 +139,4 @@ public class FilteredKeyValueScanner implements KeyValueScanner {
public boolean seekToPreviousRow(KeyValue arg0) throws IOException {
return this.delegate.seekToPreviousRow(arg0);
}
- */
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/table/CoprocessorHTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/table/CoprocessorHTableFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/table/CoprocessorHTableFactory.java
index 5ded879..2af2c7d 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/table/CoprocessorHTableFactory.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/table/CoprocessorHTableFactory.java
@@ -7,6 +7,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.util.Bytes;
@@ -40,7 +41,7 @@ public class CoprocessorHTableFactory implements HTableFactory {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating new HTable: " + Bytes.toString(tablename.copyBytesIfNecessary()));
}
- return this.e.getTable(tablename.copyBytesIfNecessary());
+ return this.e.getTable(TableName.valueOf(tablename.copyBytesIfNecessary()));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/util/IndexManagementUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/util/IndexManagementUtil.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/util/IndexManagementUtil.java
index 9ee81a9..16b3584 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/util/IndexManagementUtil.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/util/IndexManagementUtil.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import com.google.common.collect.Maps;
import org.apache.hadoop.hbase.index.ValueGetter;
@@ -65,7 +66,8 @@ public class IndexManagementUtil {
} catch (Throwable t) {
return false;
}
- if (INDEX_WAL_EDIT_CODEC_CLASS_NAME.equals(conf.get(WAL_EDIT_CODEC_CLASS_KEY, null))) {
+ if (INDEX_WAL_EDIT_CODEC_CLASS_NAME.equals(conf
+ .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, null))) {
// its installed, and it can handle compression and non-compression cases
return true;
}
@@ -91,7 +93,8 @@ public class IndexManagementUtil {
if (indexLogReaderName.equals(conf.get(HLOG_READER_IMPL_KEY, indexLogReaderName))) {
if (conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false)) { throw new IllegalStateException(
"WAL Compression is only supported with " + codecClass
- + ". You can install in hbase-site.xml, under " + WAL_EDIT_CODEC_CLASS_KEY); }
+ + ". You can install in hbase-site.xml, under " + WALCellCodec.WAL_CELL_CODEC_CLASS_KEY);
+ }
} else {
throw new IllegalStateException(codecClass + " is not installed, but "
+ indexLogReaderName + " hasn't been installed in hbase-site.xml under " + HLOG_READER_IMPL_KEY);
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/IndexedKeyValue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/IndexedKeyValue.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/IndexedKeyValue.java
index a7f4e82..5b2c6b4 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/IndexedKeyValue.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/IndexedKeyValue.java
@@ -8,8 +8,13 @@ import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
@@ -46,12 +51,20 @@ public class IndexedKeyValue extends KeyValue {
}
/**
- * This is a KeyValue that shouldn't actually be replayed, so we always mark it as an {@link HLog#METAFAMILY} so it
+ * This is a KeyValue that shouldn't actually be replayed, so we always mark it as an {@link WALEdit#METAFAMILY} so it
* isn't replayed via the normal replay mechanism
*/
@Override
public boolean matchingFamily(final byte[] family) {
- return Bytes.equals(family, HLog.METAFAMILY);
+ return Bytes.equals(family, WALEdit.METAFAMILY);
+ }
+
+ /**
+ * Not a real KeyValue
+ */
+ @Override
+ public boolean matchingRow(final byte [] row) {
+ return false;
}
@Override
@@ -77,22 +90,11 @@ public class IndexedKeyValue extends KeyValue {
}
private byte[] getMutationBytes() {
- ByteArrayOutputStream bos = null;
try {
- bos = new ByteArrayOutputStream();
- this.mutation.write(new DataOutputStream(bos));
- bos.flush();
- return bos.toByteArray();
+ MutationProto m = toMutationProto(this.mutation);
+ return m.toByteArray();
} catch (IOException e) {
throw new IllegalArgumentException("Failed to get bytes for mutation!", e);
- } finally {
- if (bos != null) {
- try {
- bos.close();
- } catch (IOException e) {
- throw new IllegalArgumentException("Failed to get bytes for mutation!", e);
- }
- }
}
}
@@ -101,11 +103,6 @@ public class IndexedKeyValue extends KeyValue {
return hashCode;
}
- @Override
- public void write(DataOutput out) throws IOException {
- KeyValueCodec.write(out, this);
- }
-
/**
* Internal write the underlying data for the entry - this does not do any special prefixing. Writing should be done
* via {@link KeyValueCodec#write(DataOutput, KeyValue)} to ensure consistent reading/writing of
@@ -118,8 +115,8 @@ public class IndexedKeyValue extends KeyValue {
*/
void writeData(DataOutput out) throws IOException {
Bytes.writeByteArray(out, this.indexTableName.get());
- out.writeUTF(this.mutation.getClass().getName());
- this.mutation.write(out);
+ MutationProto m = toMutationProto(this.mutation);
+ Bytes.writeByteArray(out, m.toByteArray());
}
/**
@@ -127,22 +124,12 @@ public class IndexedKeyValue extends KeyValue {
* complement to {@link #writeData(DataOutput)}.
*/
@SuppressWarnings("javadoc")
- @Override
public void readFields(DataInput in) throws IOException {
this.indexTableName = new ImmutableBytesPtr(Bytes.readByteArray(in));
- Class<? extends Mutation> clazz;
- try {
- clazz = Class.forName(in.readUTF()).asSubclass(Mutation.class);
- this.mutation = clazz.newInstance();
- this.mutation.readFields(in);
- this.hashCode = calcHashCode(indexTableName, mutation);
- } catch (ClassNotFoundException e) {
- throw new IOException(e);
- } catch (InstantiationException e) {
- throw new IOException(e);
- } catch (IllegalAccessException e) {
- throw new IOException(e);
- }
+ byte[] mutationData = Bytes.readByteArray(in);
+ MutationProto mProto = MutationProto.parseFrom(mutationData);
+ this.mutation = org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(mProto);
+ this.hashCode = calcHashCode(indexTableName, mutation);
}
public boolean getBatchFinished() {
@@ -152,4 +139,18 @@ public class IndexedKeyValue extends KeyValue {
public void markBatchFinished() {
this.batchFinished = true;
}
+
+ protected MutationProto toMutationProto(Mutation mutation) throws IOException {
+ MutationProto m = null;
+ if(mutation instanceof Put){
+ m = org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(MutationType.PUT,
+ mutation);
+ } else if(mutation instanceof Delete) {
+ m = org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(MutationType.DELETE,
+ mutation);
+ } else {
+ throw new IOException("Put/Delete mutations only supported");
+ }
+ return m;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/KeyValueCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/KeyValueCodec.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/KeyValueCodec.java
index 0abdf8d..3340edc 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/KeyValueCodec.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/KeyValueCodec.java
@@ -7,6 +7,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -49,16 +50,14 @@ public class KeyValueCodec {
*/
public static KeyValue readKeyValue(DataInput in) throws IOException {
int length = in.readInt();
- KeyValue kv;
// its a special IndexedKeyValue
if (length == INDEX_TYPE_LENGTH_MARKER) {
- kv = new IndexedKeyValue();
+ IndexedKeyValue kv = new IndexedKeyValue();
kv.readFields(in);
+ return kv;
} else {
- kv = new KeyValue();
- kv.readFields(length, in);
+ return KeyValue.create(length, in);
}
- return kv;
}
/**
@@ -73,7 +72,7 @@ public class KeyValueCodec {
out.writeInt(INDEX_TYPE_LENGTH_MARKER);
((IndexedKeyValue) kv).writeData(out);
} else {
- kv.write(out);
+ KeyValue.write(kv, out);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java
index bad82c4..dcef90a 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java
@@ -8,6 +8,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.io.Writable;
@@ -31,122 +32,12 @@ import org.apache.hadoop.io.Writable;
* we need to track which of the regions were on the server when it crashed only only split those
* edits out into their respective regions.
*/
-public class IndexedHLogReader implements Reader {
+public class IndexedHLogReader extends ProtobufLogReader {
private static final Log LOG = LogFactory.getLog(IndexedHLogReader.class);
- private SequenceFileLogReader delegate;
-
-
- private static class IndexedWALReader extends SequenceFileLogReader.WALReader {
-
- /**
- * @param fs
- * @param p
- * @param c
- * @throws IOException
- */
- IndexedWALReader(FileSystem fs, Path p, Configuration c) throws IOException {
- super(fs, p, c);
- }
-
- /**
- * we basically have to reproduce what the SequenceFile.Reader is doing in next(), but without
- * the check on the value class, since we have a special value class that doesn't directly match
- * what was specified in the file header
- */
- @Override
- public synchronized boolean next(Writable key, Writable val) throws IOException {
- boolean more = next(key);
-
- if (more) {
- getCurrentValue(val);
- }
-
- return more;
- }
-
- }
-
- public IndexedHLogReader() {
- this.delegate = new SequenceFileLogReader();
- }
-
- @Override
- public void init(final FileSystem fs, final Path path, Configuration conf) throws IOException {
- this.delegate.init(fs, path, conf);
- // close the old reader and replace with our own, custom one
- this.delegate.reader.close();
- this.delegate.reader = new IndexedWALReader(fs, path, conf);
- Exception e = new Exception();
- LOG.info("Instantiated indexed log reader." + Arrays.toString(e.getStackTrace()));
- LOG.info("Got conf: " + conf);
- }
-
- @Override
- public void close() throws IOException {
- this.delegate.close();
- }
-
- @Override
- public Entry next() throws IOException {
- return next(null);
- }
-
- @Override
- public Entry next(Entry reuse) throws IOException {
- delegate.entryStart = delegate.reader.getPosition();
- HLog.Entry e = reuse;
- if (e == null) {
- HLogKey key;
- if (delegate.keyClass == null) {
- key = HLog.newKey(delegate.conf);
- } else {
- try {
- key = delegate.keyClass.newInstance();
- } catch (InstantiationException ie) {
- throw new IOException(ie);
- } catch (IllegalAccessException iae) {
- throw new IOException(iae);
- }
- }
- WALEdit val = new WALEdit();
- e = new HLog.Entry(key, val);
- }
-
- // now read in the HLog.Entry from the WAL
- boolean nextPairValid = false;
- try {
- if (delegate.compressionContext != null) {
- throw new UnsupportedOperationException(
- "Reading compression isn't supported with the IndexedHLogReader! Compresed WALEdits "
- + "are only support for HBase 0.94.9+ and with the IndexedWALEditCodec!");
- }
- // this is the special bit - we use our custom entry to read in the key-values that have index
- // information, but otherwise it looks just like a regular WALEdit
- IndexedWALEdit edit = new IndexedWALEdit(e.getEdit());
- nextPairValid = delegate.reader.next(e.getKey(), edit);
- } catch (IOException ioe) {
- throw delegate.addFileInfoToException(ioe);
- }
- delegate.edit++;
- if (delegate.compressionContext != null && delegate.emptyCompressionContext) {
- delegate.emptyCompressionContext = false;
- }
- return nextPairValid ? e : null;
- }
-
- @Override
- public void seek(long pos) throws IOException {
- this.delegate.seek(pos);
- }
-
- @Override
- public long getPosition() throws IOException {
- return this.delegate.getPosition();
- }
-
@Override
- public void reset() throws IOException {
- this.delegate.reset();
+ protected void initAfterCompression() throws IOException {
+ conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, IndexedWALEditCodec.class.getName());
+ super.initAfterCompression();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
deleted file mode 100644
index 6749cc9..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import org.apache.hadoop.hbase.index.wal.KeyValueCodec;
-
-/**
- * Read in data for a delegate {@link WALEdit}. This should only be used in concert with an IndexedHLogReader
- * <p>
- * This class should only be used with HBase < 0.94.9. Newer installations of HBase should
- * instead use the IndexedWALEditCodec along with the correct configuration options.
- */
-public class IndexedWALEdit extends WALEdit {
- //reproduced here so we don't need to modify the HBase source.
- private static final int VERSION_2 = -1;
- private WALEdit delegate;
-
- /**
- * Copy-constructor. Only does a surface copy of the delegates fields - no actual data is copied, only referenced.
- * @param delegate to copy
- */
- @SuppressWarnings("deprecation")
- public IndexedWALEdit(WALEdit delegate) {
- this.delegate = delegate;
- // reset the delegate's fields
- this.delegate.getKeyValues().clear();
- if (this.delegate.getScopes() != null) {
- this.delegate.getScopes().clear();
- }
- }
-
- public IndexedWALEdit() {
-
- }
-
- @Override
-public void setCompressionContext(CompressionContext context) {
- throw new UnsupportedOperationException(
- "Compression not supported for IndexedWALEdit! If you are using HBase 0.94.9+, use IndexedWALEditCodec instead.");
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public void readFields(DataInput in) throws IOException {
- delegate.getKeyValues().clear();
- if (delegate.getScopes() != null) {
- delegate.getScopes().clear();
- }
- // ----------------------------------------------------------------------------------------
- // no compression, so we do pretty much what the usual WALEdit does, plus a little magic to
- // capture the index updates
- // -----------------------------------------------------------------------------------------
- int versionOrLength = in.readInt();
- if (versionOrLength != VERSION_2) {
- throw new IOException("You must update your cluster to the lastest version of HBase and"
- + " clean out all logs (cleanly start and then shutdown) before enabling indexing!");
- }
- // this is new style HLog entry containing multiple KeyValues.
- List<KeyValue> kvs = KeyValueCodec.readKeyValues(in);
- delegate.getKeyValues().addAll(kvs);
-
- // then read in the rest of the WALEdit
- int numFamilies = in.readInt();
- NavigableMap<byte[], Integer> scopes = delegate.getScopes();
- if (numFamilies > 0) {
- if (scopes == null) {
- scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
- }
- for (int i = 0; i < numFamilies; i++) {
- byte[] fam = Bytes.readByteArray(in);
- int scope = in.readInt();
- scopes.put(fam, scope);
- }
- delegate.setScopes(scopes);
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- throw new IOException(
- "Indexed WALEdits aren't written directly out - use IndexedKeyValues instead");
- }
-}
\ No newline at end of file