You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/03/27 18:35:44 UTC
[1/3] phoenix git commit: Secondary indexing with txns
Repository: phoenix
Updated Branches:
refs/heads/txn 826ebf5ce -> 5a558e16c
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index eb117bf..1e28766 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -18,91 +18,41 @@
package org.apache.phoenix.index;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.phoenix.compile.ScanRanges;
-import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
-import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.schema.types.PVarbinary;
-import org.apache.phoenix.util.ScanUtil;
-import org.apache.phoenix.util.SchemaUtil;
-
-import com.google.common.collect.Lists;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
+import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.util.IndexUtil;
/**
* Index builder for covered-columns index that ties into phoenix for faster use.
*/
-public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder {
+public class PhoenixIndexBuilder extends NonTxIndexBuilder {
+
+ @Override
+ public void setup(RegionCoprocessorEnvironment env) throws IOException {
+ super.setup(env);
+ Configuration conf = env.getConfiguration();
+ // Install handler that will attempt to disable the index first before killing the region
+ // server
+ conf.setIfUnset(IndexWriter.INDEX_FAILURE_POLICY_CONF_KEY,
+ PhoenixIndexFailurePolicy.class.getName());
+ }
@Override
public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
// The entire purpose of this method impl is to get the existing rows for the
// table rows being indexed into the block cache, as the index maintenance code
- // does a point scan per row
- List<KeyRange> keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size());
- Map<ImmutableBytesWritable, IndexMaintainer> maintainers =
- new HashMap<ImmutableBytesWritable, IndexMaintainer>();
- ImmutableBytesWritable indexTableName = new ImmutableBytesWritable();
- for (int i = 0; i < miniBatchOp.size(); i++) {
- Mutation m = miniBatchOp.getOperation(i);
- keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow()));
- List<IndexMaintainer> indexMaintainers = getCodec().getIndexMetaData(m.getAttributesMap()).getIndexMaintainers();
-
- for(IndexMaintainer indexMaintainer: indexMaintainers) {
- if (indexMaintainer.isImmutableRows() && indexMaintainer.isLocalIndex()) continue;
- indexTableName.set(indexMaintainer.getIndexTableName());
- if (maintainers.get(indexTableName) != null) continue;
- maintainers.put(indexTableName, indexMaintainer);
- }
-
- }
- if (maintainers.isEmpty()) return;
- Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList<IndexMaintainer>(maintainers.values()));
- ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
- scanRanges.initializeScan(scan);
- scan.setFilter(scanRanges.getSkipScanFilter());
- HRegion region = this.env.getRegion();
- RegionScanner scanner = region.getScanner(scan);
- // Run through the scanner using internal nextRaw method
- region.startRegionOperation();
- try {
- synchronized (scanner) {
- boolean hasMore;
- do {
- List<Cell> results = Lists.newArrayList();
- // Results are potentially returned even when the return value of s.next is
- // false since this is an indication of whether or not there are more values
- // after the ones returned
- hasMore = scanner.nextRaw(results);
- } while (hasMore);
- }
- } finally {
- try {
- scanner.close();
- } finally {
- region.closeRegionOperation();
- }
- }
+ // does a point scan per row.
+ // TODO: provide a means for the transactional case to just return the Scanner
+ // for when this is executed as it seems like that would be more efficient.
+ IndexUtil.loadMutatingRowsIntoBlockCache(this.env.getRegion(), getCodec(), miniBatchOp, useRawScanToPrimeBlockCache());
}
private PhoenixIndexCodec getCodec() {
return (PhoenixIndexCodec)this.codec;
}
-
- @Override
- public byte[] getBatchId(Mutation m){
- return this.codec.getBatchId(m);
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 8b507b6..7b77c37 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -1,19 +1,11 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
*/
package org.apache.phoenix.index;
@@ -25,7 +17,6 @@ import java.util.Map;
import co.cask.tephra.Transaction;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -41,13 +32,12 @@ import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.IndexCodec;
import org.apache.phoenix.hbase.index.covered.IndexUpdate;
import org.apache.phoenix.hbase.index.covered.TableState;
+import org.apache.phoenix.hbase.index.covered.TxIndexBuilder;
import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
import org.apache.phoenix.hbase.index.scanner.Scanner;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
-import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ServerUtil;
@@ -63,42 +53,28 @@ import com.google.common.collect.Lists;
public class PhoenixIndexCodec extends BaseIndexCodec {
public static final String INDEX_MD = "IdxMD";
public static final String INDEX_UUID = "IdxUUID";
+ public static final String INDEX_MAINTAINERS = "IndexMaintainers";
private RegionCoprocessorEnvironment env;
- private KeyValueBuilder kvBuilder;
+ private KeyValueBuilder kvBuilder = GenericKeyValueBuilder.INSTANCE;;
@Override
- public void initialize(RegionCoprocessorEnvironment env) {
+ public void initialize(RegionCoprocessorEnvironment env) throws IOException {
+ super.initialize(env);
this.env = env;
- Configuration conf = env.getConfiguration();
- // Install handler that will attempt to disable the index first before killing the region
- // server
- conf.setIfUnset(IndexWriter.INDEX_FAILURE_POLICY_CONF_KEY,
- PhoenixIndexFailurePolicy.class.getName());
- // Use the GenericKeyValueBuilder, as it's been shown in perf testing that ClientKeyValue doesn't help
- // TODO: Jesse to investigate more
- this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
}
boolean hasIndexMaintainers(Map<String, byte[]> attributes) {
- if (attributes == null) {
- return false;
- }
+ if (attributes == null) { return false; }
byte[] uuid = attributes.get(INDEX_UUID);
- if (uuid == null) {
- return false;
- }
+ if (uuid == null) { return false; }
return true;
}
-
- IndexMetaDataCache getIndexMetaData(Map<String, byte[]> attributes) throws IOException{
- if (attributes == null) {
- return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE;
- }
+
+ public IndexMetaDataCache getIndexMetaData(Map<String, byte[]> attributes) throws IOException {
+ if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
byte[] uuid = attributes.get(INDEX_UUID);
- if (uuid == null) {
- return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE;
- }
+ if (uuid == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
byte[] md = attributes.get(INDEX_MD);
byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE);
if (md != null) {
@@ -107,8 +83,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
return new IndexMetaDataCache() {
@Override
- public void close() throws IOException {
- }
+ public void close() throws IOException {}
@Override
public List<IndexMaintainer> getIndexMaintainers() {
@@ -119,26 +94,24 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
public Transaction getTransaction() {
return txn;
}
-
+
};
} else {
byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB);
- ImmutableBytesWritable tenantId =
- tenantIdBytes == null ? null : new ImmutableBytesWritable(tenantIdBytes);
+ ImmutableBytesWritable tenantId = tenantIdBytes == null ? null : new ImmutableBytesWritable(tenantIdBytes);
TenantCache cache = GlobalCache.getTenantCache(env, tenantId);
- IndexMetaDataCache indexCache =
- (IndexMetaDataCache) cache.getServerCache(new ImmutableBytesPtr(uuid));
+ IndexMetaDataCache indexCache = (IndexMetaDataCache)cache.getServerCache(new ImmutableBytesPtr(uuid));
if (indexCache == null) {
- String msg = "key="+ServerCacheClient.idToString(uuid) + " region=" + env.getRegion();
+ String msg = "key=" + ServerCacheClient.idToString(uuid) + " region=" + env.getRegion();
SQLException e = new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_METADATA_NOT_FOUND)
- .setMessage(msg).build().buildException();
+ .setMessage(msg).build().buildException();
ServerUtil.throwIOException("Index update failed", e); // will not return
}
return indexCache;
}
-
+
}
-
+
@Override
public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException {
return getIndexUpdates(state, true);
@@ -150,18 +123,16 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
}
/**
- *
* @param state
- * @param upsert prepare index upserts if it's true otherwise prepare index deletes.
+ * @param upsert
+ * prepare index upserts if it's true otherwise prepare index deletes.
* @return
* @throws IOException
*/
private Iterable<IndexUpdate> getIndexUpdates(TableState state, boolean upsert) throws IOException {
- IndexMetaDataCache indexCache = getIndexMetaData(state.getUpdateAttributes());
- List<IndexMaintainer> indexMaintainers = indexCache.getIndexMaintainers();
- if (indexMaintainers.isEmpty()) {
- return Collections.emptyList();
- }
+ @SuppressWarnings("unchecked")
+ List<IndexMaintainer> indexMaintainers = (List<IndexMaintainer>)state.getContext().get(INDEX_MAINTAINERS);
+ if (indexMaintainers.isEmpty()) { return Collections.emptyList(); }
List<IndexUpdate> indexUpdates = Lists.newArrayList();
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
// TODO: state.getCurrentRowKey() should take an ImmutableBytesWritable arg to prevent byte copy
@@ -171,7 +142,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
ValueGetter valueGetter = null;
Scanner scanner = null;
for (IndexMaintainer maintainer : indexMaintainers) {
- if(upsert) {
+ if (upsert) {
// Short-circuit building state when we know it's a row deletion
if (maintainer.isRowDeleted(state.getPendingUpdate())) {
continue;
@@ -180,31 +151,25 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
IndexUpdate indexUpdate = null;
if (maintainer.isImmutableRows()) {
indexUpdate = new IndexUpdate(new ColumnTracker(maintainer.getAllColumns()));
- if(maintainer.isLocalIndex()) {
+ if (maintainer.isLocalIndex()) {
indexUpdate.setTable(localIndexTableName);
} else {
indexUpdate.setTable(maintainer.getIndexTableName());
}
valueGetter = maintainer.createGetterFromKeyValues(dataRowKey, state.getPendingUpdate());
} else {
- // TODO: if more efficient, I could do this just once with all columns in all indexes
- Pair<Scanner,IndexUpdate> statePair = state.getIndexedColumnsTableState(maintainer.getAllColumns());
- scanner = statePair.getFirst();
+ Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns());
+ valueGetter = statePair.getFirst();
indexUpdate = statePair.getSecond();
indexUpdate.setTable(maintainer.getIndexTableName());
- valueGetter = IndexManagementUtil.createGetterFromScanner(scanner, dataRowKey);
}
Mutation mutation = null;
if (upsert) {
- mutation =
- maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, state
- .getCurrentTimestamp(), env.getRegion().getStartKey(), env
- .getRegion().getEndKey());
+ mutation = maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, state.getCurrentTimestamp(), env
+ .getRegion().getStartKey(), env.getRegion().getEndKey());
} else {
- mutation =
- maintainer.buildDeleteMutation(kvBuilder, valueGetter, ptr, state
- .getPendingUpdate(), state.getCurrentTimestamp(), env.getRegion()
- .getStartKey(), env.getRegion().getEndKey());
+ mutation = maintainer.buildDeleteMutation(kvBuilder, valueGetter, ptr, state.getPendingUpdate(),
+ state.getCurrentTimestamp(), env.getRegion().getStartKey(), env.getRegion().getEndKey());
}
indexUpdate.setUpdate(mutation);
if (scanner != null) {
@@ -215,15 +180,25 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
}
return indexUpdates;
}
-
- @Override
- public boolean isEnabled(Mutation m) throws IOException {
- return !hasIndexMaintainers(m.getAttributesMap());
- }
-
- @Override
- public byte[] getBatchId(Mutation m) {
- Map<String, byte[]> attributes = m.getAttributesMap();
- return attributes.get(INDEX_UUID);
- }
+
+ @Override
+ public boolean isEnabled(Mutation m) throws IOException {
+ return !hasIndexMaintainers(m.getAttributesMap());
+ }
+
+ @Override
+ public byte[] getBatchId(Mutation m) {
+ Map<String, byte[]> attributes = m.getAttributesMap();
+ return attributes.get(INDEX_UUID);
+ }
+
+ @Override
+ public void setContext(TableState state, Mutation mutation) throws IOException {
+ IndexMetaDataCache indexCache = getIndexMetaData(state.getUpdateAttributes());
+ List<IndexMaintainer> indexMaintainers = indexCache.getIndexMaintainers();
+ Map<String,Object> context = state.getContext();
+ context.clear();
+ context.put(INDEX_MAINTAINERS, indexMaintainers);
+ context.put(TxIndexBuilder.TRANSACTION, indexCache.getTransaction());
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index 2fd168a..806a20a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -75,7 +75,7 @@ import com.google.common.collect.Multimap;
*
* @since 2.1
*/
-public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy {
+public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy {
private static final Log LOG = LogFactory.getLog(PhoenixIndexFailurePolicy.class);
private RegionCoprocessorEnvironment env;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java
new file mode 100644
index 0000000..c471df6
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.index;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.phoenix.hbase.index.covered.TxIndexBuilder;
+import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.util.IndexUtil;
+
+public class PhoenixTxIndexBuilder extends TxIndexBuilder {
+ @Override
+ public void setup(RegionCoprocessorEnvironment env) throws IOException {
+ super.setup(env);
+ Configuration conf = env.getConfiguration();
+ // Install failure policy that just re-throws exception instead of killing RS
+ // or disabling the index
+ conf.set(IndexWriter.INDEX_FAILURE_POLICY_CONF_KEY, PhoenixTxIndexFailurePolicy.class.getName());
+ }
+
+ @Override
+ public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ // The entire purpose of this method impl is to get the existing rows for the
+ // table rows being indexed into the block cache, as the index maintenance code
+ // does a point scan per row.
+ // TODO: provide a means for the transactional case to just return the Scanner
+ // for when this is executed as it seems like that would be more efficient.
+ IndexUtil.loadMutatingRowsIntoBlockCache(this.env.getRegion(), getCodec(), miniBatchOp, useRawScanToPrimeBlockCache());
+ }
+
+ private PhoenixIndexCodec getCodec() {
+ return (PhoenixIndexCodec)this.codec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java
new file mode 100644
index 0000000..fa70cc9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.index;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.write.IndexFailurePolicy;
+
+import com.google.common.collect.Multimap;
+
+public class PhoenixTxIndexFailurePolicy implements IndexFailurePolicy {
+ private Stoppable parent;
+
+ @Override
+ public void stop(String why) {
+ if (parent != null) {
+ parent.stop(why);
+ }
+ }
+
+ @Override
+ public boolean isStopped() {
+ return parent == null ? false : parent.isStopped();
+ }
+
+ @Override
+ public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause)
+ throws IOException {
+ if (cause instanceof IOException) {
+ throw (IOException)cause;
+ } else if (cause instanceof RuntimeException) { throw (RuntimeException)cause; }
+ throw new IOException(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 5abf151..c14aa60 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -107,11 +107,12 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.IndexRegionSplitPolicy;
import org.apache.phoenix.hbase.index.Indexer;
-import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.PhoenixIndexBuilder;
import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.index.PhoenixTxIndexBuilder;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
@@ -696,6 +697,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) {
descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, priority, null);
}
+ boolean isTransactional = Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name()));
// TODO: better encapsulation for this
// Since indexes can't have indexes, don't install our indexing coprocessor for indexes.
// Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table because we use
@@ -705,8 +707,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
&& !SchemaUtil.isStatsTable(tableName)
&& !descriptor.hasCoprocessor(Indexer.class.getName())) {
Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
- opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
- Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
+ opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+ Indexer.enableIndexing(descriptor, isTransactional ? PhoenixTxIndexBuilder.class : PhoenixIndexBuilder.class, opts, priority);
}
if (SchemaUtil.isStatsTable(tableName) && !descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName())) {
descriptor.addCoprocessor(MultiRowMutationEndpoint.class.getName(),
@@ -743,7 +745,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
- if (Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) && !descriptor.hasCoprocessor(TransactionProcessor.class.getName())) {
+ if (isTransactional && !descriptor.hasCoprocessor(TransactionProcessor.class.getName())) {
descriptor.addCoprocessor(TransactionProcessor.class.getName(), null, priority - 10, null);
}
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java
index 6fc480e..1f271f4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java
@@ -17,8 +17,6 @@
*/
package org.apache.phoenix.schema.tuple;
-import static org.apache.phoenix.hbase.index.util.ImmutableBytesPtr.copyBytesIfNecessary;
-
import java.io.IOException;
import org.apache.hadoop.hbase.HConstants;
@@ -28,7 +26,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
/**
*
@@ -56,13 +53,22 @@ public class ValueGetterTuple extends BaseTuple {
@Override
public KeyValue getValue(byte[] family, byte[] qualifier) {
- ImmutableBytesPtr value = null;
+ ImmutableBytesWritable value = null;
try {
value = valueGetter.getLatestValue(new ColumnReference(family, qualifier));
} catch (IOException e) {
throw new RuntimeException(e);
}
- return new KeyValue(valueGetter.getRowKey(), family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Put, value!=null? copyBytesIfNecessary(value) : null);
+ byte[] rowKey = valueGetter.getRowKey();
+ int valueOffset = 0;
+ int valueLength = 0;
+ byte[] valueBytes = HConstants.EMPTY_BYTE_ARRAY;
+ if (value != null) {
+ valueBytes = value.get();
+ valueOffset = value.getOffset();
+ valueLength = value.getLength();
+ }
+ return new KeyValue(rowKey, 0, rowKey.length, family, 0, family.length, qualifier, 0, qualifier.length, HConstants.LATEST_TIMESTAMP, Type.Put, valueBytes, valueOffset, valueLength);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 49956f9..1fcf16a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -24,6 +24,8 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -41,12 +43,15 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.compile.ColumnResolver;
import org.apache.phoenix.compile.FromCompiler;
import org.apache.phoenix.compile.IndexStatementRewriter;
+import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.compile.WhereCompiler;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -60,13 +65,16 @@ import org.apache.phoenix.expression.visitor.RowKeyExpressionVisitor;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.SQLParser;
import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
import org.apache.phoenix.schema.ColumnNotFoundException;
@@ -235,7 +243,7 @@ public class IndexUtil {
}
@Override
- public ImmutableBytesPtr getLatestValue(ColumnReference ref) {
+ public ImmutableBytesWritable getLatestValue(ColumnReference ref) {
// Always return null for our empty key value, as this will cause the index
// maintainer to always treat this Put as a new row.
if (isEmptyKeyValue(table, ref)) {
@@ -636,4 +644,58 @@ public class IndexUtil {
return col.getExpressionStr() == null ? IndexUtil.getCaseSensitiveDataColumnFullName(col.getName().getString())
: col.getExpressionStr();
}
+
+ /*
+ * The entire purpose of this method is to get the existing rows for the table rows being indexed into the
+ * block cache, as the index maintenance code does a point scan per row. Though for the transactional
+ * case we may be loading more than we need, since we're not applying the transaction filters, that
+ * should still be ok.
+ */
+ public static void loadMutatingRowsIntoBlockCache(HRegion region, PhoenixIndexCodec codec, MiniBatchOperationInProgress<Mutation> miniBatchOp, boolean useRawScan)
+ throws IOException {
+ List<KeyRange> keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size());
+ Map<ImmutableBytesWritable, IndexMaintainer> maintainers =
+ new HashMap<ImmutableBytesWritable, IndexMaintainer>();
+ ImmutableBytesWritable indexTableName = new ImmutableBytesWritable();
+ for (int i = 0; i < miniBatchOp.size(); i++) {
+ Mutation m = miniBatchOp.getOperation(i);
+ keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow()));
+ List<IndexMaintainer> indexMaintainers = codec.getIndexMetaData(m.getAttributesMap()).getIndexMaintainers();
+
+ for(IndexMaintainer indexMaintainer: indexMaintainers) {
+ if (indexMaintainer.isImmutableRows() && indexMaintainer.isLocalIndex()) continue;
+ indexTableName.set(indexMaintainer.getIndexTableName());
+ if (maintainers.get(indexTableName) != null) continue;
+ maintainers.put(indexTableName, indexMaintainer);
+ }
+
+ }
+ if (maintainers.isEmpty()) return;
+ Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList<IndexMaintainer>(maintainers.values()));
+ scan.setRaw(useRawScan);
+ ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
+ scanRanges.initializeScan(scan);
+ scan.setFilter(scanRanges.getSkipScanFilter());
+ RegionScanner scanner = region.getScanner(scan);
+ // Run through the scanner using internal nextRaw method
+ region.startRegionOperation();
+ try {
+ synchronized (scanner) {
+ boolean hasMore;
+ do {
+ List<Cell> results = Lists.newArrayList();
+ // Results are potentially returned even when the return value of s.next is
+ // false since this is an indication of whether or not there are more values
+ // after the ones returned
+ hasMore = scanner.nextRaw(results);
+ } while (hasMore);
+ }
+ } finally {
+ try {
+ scanner.close();
+ } finally {
+ region.closeRegionOperation();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
index cb63380..7e73a81 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
@@ -1,19 +1,11 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
*/
package org.apache.phoenix.hbase.index.covered;
@@ -24,51 +16,50 @@ import java.util.List;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-
-import org.apache.phoenix.hbase.index.covered.IndexCodec;
-import org.apache.phoenix.hbase.index.covered.IndexUpdate;
-import org.apache.phoenix.hbase.index.covered.TableState;
import org.apache.phoenix.index.BaseIndexCodec;
/**
- * An {@link IndexCodec} for testing that allow you to specify the index updates/deletes, regardless
- * of the current tables' state.
+ * An {@link IndexCodec} for testing that allow you to specify the index updates/deletes, regardless of the current
+ * tables' state.
*/
public class CoveredIndexCodecForTesting extends BaseIndexCodec {
- private List<IndexUpdate> deletes = new ArrayList<IndexUpdate>();
- private List<IndexUpdate> updates = new ArrayList<IndexUpdate>();
+ private List<IndexUpdate> deletes = new ArrayList<IndexUpdate>();
+ private List<IndexUpdate> updates = new ArrayList<IndexUpdate>();
+
+ public void addIndexDelete(IndexUpdate... deletes) {
+ this.deletes.addAll(Arrays.asList(deletes));
+ }
+
+ public void addIndexUpserts(IndexUpdate... updates) {
+ this.updates.addAll(Arrays.asList(updates));
+ }
+
+ public void clear() {
+ this.deletes.clear();
+ this.updates.clear();
+ }
- public void addIndexDelete(IndexUpdate... deletes) {
- this.deletes.addAll(Arrays.asList(deletes));
- }
-
- public void addIndexUpserts(IndexUpdate... updates) {
- this.updates.addAll(Arrays.asList(updates));
- }
+ @Override
+ public Iterable<IndexUpdate> getIndexDeletes(TableState state) {
+ return this.deletes;
+ }
- public void clear() {
- this.deletes.clear();
- this.updates.clear();
- }
-
- @Override
- public Iterable<IndexUpdate> getIndexDeletes(TableState state) {
- return this.deletes;
- }
+ @Override
+ public Iterable<IndexUpdate> getIndexUpserts(TableState state) {
+ return this.updates;
+ }
- @Override
- public Iterable<IndexUpdate> getIndexUpserts(TableState state) {
- return this.updates;
- }
+ @Override
+ public void initialize(RegionCoprocessorEnvironment env) throws IOException {
+ // noop
+ }
- @Override
- public void initialize(RegionCoprocessorEnvironment env) throws IOException {
- // noop
- }
+ @Override
+ public boolean isEnabled(Mutation m) {
+ return true;
+ }
- @Override
- public boolean isEnabled(Mutation m) {
- return true;
- }
+ @Override
+ public void setContext(TableState state, Mutation mutation) throws IOException {}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
index 8c15551..3024820 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
@@ -23,9 +23,10 @@ import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -33,17 +34,14 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import org.apache.phoenix.hbase.index.covered.IndexUpdate;
-import org.apache.phoenix.hbase.index.covered.LocalTableState;
import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
import org.apache.phoenix.hbase.index.covered.data.LocalTable;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.scanner.Scanner;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
/**
*
@@ -86,7 +84,7 @@ public class TestLocalTableState {
LocalHBaseState state = new LocalTable(env);
LocalTableState table = new LocalTableState(env, state, m);
//add the kvs from the mutation
- table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual)));
+ table.addPendingUpdates(m.get(fam, qual));
// setup the lookup
ColumnReference col = new ColumnReference(fam, qual);
@@ -128,8 +126,8 @@ public class TestLocalTableState {
LocalHBaseState state = new LocalTable(env);
LocalTableState table = new LocalTableState(env, state, m);
// add the kvs from the mutation
- KeyValue kv = KeyValueUtil.ensureKeyValue(m.get(fam, qual).get(0));
- kv.setMvccVersion(0);
+ Cell kv = m.get(fam, qual).get(0);
+ KeyValueUtil.ensureKeyValue(kv).setMvccVersion(0);
table.addPendingUpdates(kv);
// setup the lookup
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
index 1b61ef0..f0c1483 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
@@ -32,26 +32,22 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import com.google.common.collect.Lists;
import org.apache.phoenix.hbase.index.covered.IndexCodec;
import org.apache.phoenix.hbase.index.covered.IndexUpdate;
import org.apache.phoenix.hbase.index.covered.LocalTableState;
import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
-import org.apache.phoenix.hbase.index.covered.example.ColumnGroup;
-import org.apache.phoenix.hbase.index.covered.example.CoveredColumn;
-import org.apache.phoenix.hbase.index.covered.example.CoveredColumnIndexCodec;
import org.apache.phoenix.hbase.index.covered.example.CoveredColumnIndexCodec.ColumnEntry;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Lists;
public class TestCoveredColumnIndexCodec {
private static final byte[] PK = new byte[] { 'a' };
@@ -190,7 +186,7 @@ public class TestCoveredColumnIndexCodec {
// get the updates with the pending update
state.setCurrentTimestamp(1);
- state.addPendingUpdates(KeyValueUtil.ensureKeyValues(kvs));
+ state.addPendingUpdates(kvs);
updates = codec.getIndexUpserts(state);
assertTrue("Didn't find index updates for pending primary table update!", updates.iterator()
.hasNext());
@@ -243,7 +239,7 @@ public class TestCoveredColumnIndexCodec {
LocalTableState state = new LocalTableState(env, table, d);
state.setCurrentTimestamp(d.getTimeStamp());
// now we shouldn't see anything when getting the index update
- state.addPendingUpdates(KeyValueUtil.ensureKeyValues(d.getFamilyCellMap().get(FAMILY)));
+ state.addPendingUpdates(d.getFamilyCellMap().get(FAMILY));
Iterable<IndexUpdate> updates = codec.getIndexUpserts(state);
for (IndexUpdate update : updates) {
assertFalse("Had some index updates, though it should have been covered by the delete",
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
index 592ac7c..6e1c28f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
@@ -73,7 +73,7 @@ public class IndexMaintainerTest extends BaseConnectionlessQueryTest {
return new ValueGetter() {
@Override
- public ImmutableBytesPtr getLatestValue(ColumnReference ref) {
+ public ImmutableBytesWritable getLatestValue(ColumnReference ref) {
return new ImmutableBytesPtr(valueMap.get(ref));
}
[2/3] phoenix git commit: Secondary indexing with txns
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
new file mode 100644
index 0000000..79a485c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder;
+import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
+import org.apache.phoenix.hbase.index.covered.data.LocalTable;
+import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
+import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
+import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+
+/**
+ * Build covered indexes for phoenix updates.
+ * <p>
+ * Before any call to prePut/preDelete, the row has already been locked. This ensures that we don't need to do any extra
+ * synchronization in the IndexBuilder.
+ * <p>
+ * NOTE: This implementation doesn't cleanup the index when we remove a key-value on compaction or flush, leading to a
+ * bloated index that needs to be cleaned up by a background process.
+ */
+public class NonTxIndexBuilder extends BaseIndexBuilder {
+ private static final Log LOG = LogFactory.getLog(NonTxIndexBuilder.class);
+
+ protected LocalHBaseState localTable;
+
+ @Override
+ public void setup(RegionCoprocessorEnvironment env) throws IOException {
+ super.setup(env);
+ this.localTable = new LocalTable(env);
+ }
+
+ @Override
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException {
+ // build the index updates for each group
+ IndexUpdateManager updateMap = new IndexUpdateManager();
+
+ batchMutationAndAddUpdates(updateMap, mutation);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Found index updates for Mutation: " + mutation + "\n" + updateMap);
+ }
+
+ return updateMap.toMap();
+ }
+
+ /**
+ * Split the mutation into batches based on the timestamps of each keyvalue. We need to check each key-value in the
+ * update to see if it matches the others. Generally, this will be the case, but you can add kvs to a mutation that
+ * don't all have the timestamp, so we need to manage everything in batches based on timestamp.
+ * <p>
+ * Adds all the updates in the {@link Mutation} to the state, as a side-effect.
+ *
+ * @param updateMap
+ * index updates into which to add new updates. Modified as a side-effect.
+ * @param state
+ * current state of the row for the mutation.
+ * @param m
+ * mutation to batch
+ * @throws IOException
+ */
+ private void batchMutationAndAddUpdates(IndexUpdateManager manager, Mutation m) throws IOException {
+ // split the mutation into timestamp-based batches
+ Collection<Batch> batches = createTimestampBatchesFromMutation(m);
+
+ // create a state manager, so we can manage each batch
+ LocalTableState state = new LocalTableState(env, localTable, m);
+
+ // go through each batch of keyvalues and build separate index entries for each
+ boolean cleanupCurrentState = true;
+ for (Batch batch : batches) {
+ /*
+ * We have to split the work between the cleanup and the update for each group because when we update the
+ * current state of the row for the current batch (appending the mutations for the current batch) the next
+ * group will see that as the current state, which will can cause the a delete and a put to be created for
+ * the next group.
+ */
+ if (addMutationsForBatch(manager, batch, state, cleanupCurrentState)) {
+ cleanupCurrentState = false;
+ }
+ }
+ }
+
+ /**
+ * Batch all the {@link KeyValue}s in a {@link Mutation} by timestamp. Updates any {@link KeyValue} with a timestamp
+ * == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at the time the method is called.
+ *
+ * @param m
+ * {@link Mutation} from which to extract the {@link KeyValue}s
+ * @return the mutation, broken into batches and sorted in ascending order (smallest first)
+ */
+ protected Collection<Batch> createTimestampBatchesFromMutation(Mutation m) {
+ Map<Long, Batch> batches = new HashMap<Long, Batch>();
+ for (List<Cell> family : m.getFamilyCellMap().values()) {
+ List<KeyValue> familyKVs = KeyValueUtil.ensureKeyValues(family);
+ createTimestampBatchesFromKeyValues(familyKVs, batches);
+ }
+ // sort the batches
+ List<Batch> sorted = new ArrayList<Batch>(batches.values());
+ Collections.sort(sorted, new Comparator<Batch>() {
+ @Override
+ public int compare(Batch o1, Batch o2) {
+ return Longs.compare(o1.getTimestamp(), o2.getTimestamp());
+ }
+ });
+ return sorted;
+ }
+
+ /**
+ * Batch all the {@link KeyValue}s in a collection of kvs by timestamp. Updates any {@link KeyValue} with a
+ * timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at the time the method is called.
+ *
+ * @param kvs
+ * {@link KeyValue}s to break into batches
+ * @param batches
+ * to update with the given kvs
+ */
+ protected void createTimestampBatchesFromKeyValues(Collection<KeyValue> kvs, Map<Long, Batch> batches) {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ byte[] nowBytes = Bytes.toBytes(now);
+
+ // batch kvs by timestamp
+ for (KeyValue kv : kvs) {
+ long ts = kv.getTimestamp();
+ // override the timestamp to the current time, so the index and primary tables match
+ // all the keys with LATEST_TIMESTAMP will then be put into the same batch
+ if (kv.updateLatestStamp(nowBytes)) {
+ ts = now;
+ }
+ Batch batch = batches.get(ts);
+ if (batch == null) {
+ batch = new Batch(ts);
+ batches.put(ts, batch);
+ }
+ batch.add(kv);
+ }
+ }
+
+ /**
+ * For a single batch, get all the index updates and add them to the updateMap
+ * <p>
+ * This method manages cleaning up the entire history of the row from the given timestamp forward for out-of-order
+ * (e.g. 'back in time') updates.
+ * <p>
+ * If things arrive out of order (client is using custom timestamps) we should still see the index in the correct
+ * order (assuming we scan after the out-of-order update in finished). Therefore, we when we aren't the most recent
+ * update to the index, we need to delete the state at the current timestamp (similar to above), but also issue a
+ * delete for the added index updates at the next newest timestamp of any of the columns in the update; we need to
+ * cleanup the insert so it looks like it was also deleted at that next newest timestamp. However, its not enough to
+ * just update the one in front of us - that column will likely be applied to index entries up the entire history in
+ * front of us, which also needs to be fixed up.
+ * <p>
+ * However, the current update usually will be the most recent thing to be added. In that case, all we need to is
+ * issue a delete for the previous index row (the state of the row, without the update applied) at the current
+ * timestamp. This gets rid of anything currently in the index for the current state of the row (at the timestamp).
+ * Then we can just follow that by applying the pending update and building the index update based on the new row
+ * state.
+ *
+ * @param updateMap
+ * map to update with new index elements
+ * @param batch
+ * timestamp-based batch of edits
+ * @param state
+ * local state to update and pass to the codec
+ * @param requireCurrentStateCleanup
+ * <tt>true</tt> if we should should attempt to cleanup the current state of the table, in the event of a
+ * 'back in time' batch. <tt>false</tt> indicates we should not attempt the cleanup, e.g. an earlier
+ * batch already did the cleanup.
+ * @return <tt>true</tt> if we cleaned up the current state forward (had a back-in-time put), <tt>false</tt>
+ * otherwise
+ * @throws IOException
+ */
+ private boolean addMutationsForBatch(IndexUpdateManager updateMap, Batch batch, LocalTableState state,
+ boolean requireCurrentStateCleanup) throws IOException {
+
+ // need a temporary manager for the current batch. It should resolve any conflicts for the
+ // current batch. Essentially, we can get the case where a batch doesn't change the current
+ // state of the index (all Puts are covered by deletes), in which case we don't want to add
+ // anything
+ // A. Get the correct values for the pending state in the batch
+ // A.1 start by cleaning up the current state - as long as there are key-values in the batch
+ // that are indexed, we need to change the current state of the index. Its up to the codec to
+ // determine if we need to make any cleanup given the pending update.
+ long batchTs = batch.getTimestamp();
+ state.setPendingUpdates(batch.getKvs());
+ addCleanupForCurrentBatch(updateMap, batchTs, state);
+
+ // A.2 do a single pass first for the updates to the current state
+ state.applyPendingUpdates();
+ long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap);
+ // if all the updates are the latest thing in the index, we are done - don't go and fix history
+ if (ColumnTracker.isNewestTime(minTs)) { return false; }
+
+ // A.3 otherwise, we need to roll up through the current state and get the 'correct' view of the
+ // index. after this, we have the correct view of the index, from the batch up to the index
+ while (!ColumnTracker.isNewestTime(minTs)) {
+ minTs = addUpdateForGivenTimestamp(minTs, state, updateMap);
+ }
+
+ // B. only cleanup the current state if we need to - its a huge waste of effort otherwise.
+ if (requireCurrentStateCleanup) {
+ // roll back the pending update. This is needed so we can remove all the 'old' index entries.
+ // We don't need to do the puts here, but just the deletes at the given timestamps since we
+ // just want to completely hide the incorrect entries.
+ state.rollback(batch.getKvs());
+ // setup state
+ state.setPendingUpdates(batch.getKvs());
+
+ // cleanup the pending batch. If anything in the correct history is covered by Deletes used to
+ // 'fix' history (same row key and ts), we just drop the delete (we don't want to drop both
+ // because the update may have a different set of columns or value based on the update).
+ cleanupIndexStateFromBatchOnward(updateMap, batchTs, state);
+
+ // have to roll the state forward again, so the current state is correct
+ state.applyPendingUpdates();
+ return true;
+ }
+ return false;
+ }
+
+ private long addUpdateForGivenTimestamp(long ts, LocalTableState state, IndexUpdateManager updateMap)
+ throws IOException {
+ state.setCurrentTimestamp(ts);
+ ts = addCurrentStateMutationsForBatch(updateMap, state);
+ return ts;
+ }
+
+ private void addCleanupForCurrentBatch(IndexUpdateManager updateMap, long batchTs, LocalTableState state)
+ throws IOException {
+ // get the cleanup for the current state
+ state.setCurrentTimestamp(batchTs);
+ addDeleteUpdatesToMap(updateMap, state, batchTs);
+ // ignore any index tracking from the delete
+ state.resetTrackedColumns();
+ }
+
+ /**
+ * Add the necessary mutations for the pending batch on the local state. Handles rolling up through history to
+ * determine the index changes after applying the batch (for the case where the batch is back in time).
+ *
+ * @param updateMap
+ * to update with index mutations
+ * @param batch
+ * to apply to the current state
+ * @param state
+ * current state of the table
+ * @return the minimum timestamp across all index columns requested. If {@link ColumnTracker#isNewestTime(long)}
+ * returns <tt>true</tt> on the returned timestamp, we know that this <i>was not a back-in-time update</i>.
+ * @throws IOException
+ */
+ private long addCurrentStateMutationsForBatch(IndexUpdateManager updateMap, LocalTableState state)
+ throws IOException {
+
+ // get the index updates for this current batch
+ Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state);
+ state.resetTrackedColumns();
+
+ /*
+ * go through all the pending updates. If we are sure that all the entries are the latest timestamp, we can just
+ * add the index updates and move on. However, if there are columns that we skip past (based on the timestamp of
+ * the batch), we need to roll back up the history. Regardless of whether or not they are the latest timestamp,
+ * the entries here are going to be correct for the current batch timestamp, so we add them to the updates. The
+ * only thing we really care about it if we need to roll up the history and fix it as we go.
+ */
+ // timestamp of the next update we need to track
+ long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
+ List<IndexedColumnGroup> columnHints = new ArrayList<IndexedColumnGroup>();
+ for (IndexUpdate update : upserts) {
+ // this is the one bit where we check the timestamps
+ final ColumnTracker tracker = update.getIndexedColumns();
+ long trackerTs = tracker.getTS();
+ // update the next min TS we need to track
+ if (trackerTs < minTs) {
+ minTs = tracker.getTS();
+ }
+ // track index hints for the next round. Hint if we need an update for that column for the
+ // next timestamp. These columns clearly won't need to update as we go through time as they
+ // already match the most recent possible thing.
+ boolean needsCleanup = false;
+ if (tracker.hasNewerTimestamps()) {
+ columnHints.add(tracker);
+ // this update also needs to be cleaned up at the next timestamp because it not the latest.
+ needsCleanup = true;
+ }
+
+ // only make the put if the index update has been setup
+ if (update.isValid()) {
+ byte[] table = update.getTableName();
+ Mutation mutation = update.getUpdate();
+ updateMap.addIndexUpdate(table, mutation);
+
+ // only make the cleanup if we made a put and need cleanup
+ if (needsCleanup) {
+ // there is a TS for the interested columns that is greater than the columns in the
+ // put. Therefore, we need to issue a delete at the same timestamp
+ Delete d = new Delete(mutation.getRow());
+ d.setTimestamp(tracker.getTS());
+ updateMap.addIndexUpdate(table, d);
+ }
+ }
+ }
+ return minTs;
+ }
+
+ /**
+ * Cleanup the index based on the current state from the given batch. Iterates over each timestamp (for the indexed
+ * rows) for the current state of the table and cleans up all the existing entries generated by the codec.
+ * <p>
+ * Adds all pending updates to the updateMap
+ *
+ * @param updateMap
+ * updated with the pending index updates from the codec
+ * @param batchTs
+ * timestamp from which we should cleanup
+ * @param state
+ * current state of the primary table. Should already by setup to the correct state from which we want to
+ * cleanup.
+ * @throws IOException
+ */
+ private void cleanupIndexStateFromBatchOnward(IndexUpdateManager updateMap, long batchTs, LocalTableState state)
+ throws IOException {
+ // get the cleanup for the current state
+ state.setCurrentTimestamp(batchTs);
+ addDeleteUpdatesToMap(updateMap, state, batchTs);
+ Set<ColumnTracker> trackers = state.getTrackedColumns();
+ long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
+ for (ColumnTracker tracker : trackers) {
+ if (tracker.getTS() < minTs) {
+ minTs = tracker.getTS();
+ }
+ }
+ state.resetTrackedColumns();
+ if (!ColumnTracker.isNewestTime(minTs)) {
+ state.setHints(Lists.newArrayList(trackers));
+ cleanupIndexStateFromBatchOnward(updateMap, minTs, state);
+ }
+ }
+
+ /**
+ * Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState)} and then add them to the
+ * update map.
+ * <p>
+ * Expects the {@link LocalTableState} to already be correctly setup (correct timestamp, updates applied, etc).
+ *
+ * @throws IOException
+ */
+ protected void addDeleteUpdatesToMap(IndexUpdateManager updateMap, LocalTableState state, long ts)
+ throws IOException {
+ Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state);
+ if (cleanup != null) {
+ for (IndexUpdate d : cleanup) {
+ if (!d.isValid()) {
+ continue;
+ }
+ // override the timestamps in the delete to match the current batch.
+ Delete remove = (Delete)d.getUpdate();
+ remove.setTimestamp(ts);
+ updateMap.addIndexUpdate(d.getTableName(), remove);
+ }
+ }
+ }
+
+ @Override
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered)
+ throws IOException {
+ // TODO Implement IndexBuilder.getIndexUpdateForFilteredRows
+ return null;
+ }
+
+ @Override
+ protected boolean useRawScanToPrimeBlockCache() {
+ return false;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
index 4c4d0b0..b8b2f19 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
@@ -23,14 +23,13 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Pair;
-
+import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
-import org.apache.phoenix.hbase.index.scanner.Scanner;
/**
* Interface for the current state of the table. This is generally going to be as of a timestamp - a
@@ -52,46 +51,14 @@ public interface TableState {
public long getCurrentTimestamp();
/**
- * Set the current timestamp up to which the table should allow access to the underlying table.
- * This overrides the timestamp view provided by the indexer - use with care!
- * @param timestamp timestamp up to which the table should allow access.
- */
- public void setCurrentTimestamp(long timestamp);
-
- /**
* @return the attributes attached to the current update (e.g. {@link Mutation}).
*/
public Map<String, byte[]> getUpdateAttributes();
/**
- * Get a scanner on the columns that are needed by the index.
- * <p>
- * The returned scanner is already pre-seeked to the first {@link KeyValue} that matches the given
- * columns with a timestamp earlier than the timestamp to which the table is currently set (the
- * current state of the table for which we need to build an update).
- * <p>
- * If none of the passed columns matches any of the columns in the pending update (as determined
- * by {@link ColumnReference#matchesFamily(byte[])} and
- * {@link ColumnReference#matchesQualifier(byte[])}, then an empty scanner will be returned. This
- * is because it doesn't make sense to build index updates when there is no change in the table
- * state for any of the columns you are indexing.
- * <p>
- * <i>NOTE:</i> This method should <b>not</b> be used during
- * {@link IndexCodec#getIndexDeletes(TableState)} as the pending update will not yet have been
- * applied - you are merely attempting to cleanup the current state and therefore do <i>not</i>
- * need to track the indexed columns.
- * <p>
- * As a side-effect, we update a timestamp for the next-most-recent timestamp for the columns you
- * request - you will never see a column with the timestamp we are tracking, but the next oldest
- * timestamp for that column.
- * @param indexedColumns the columns to that will be indexed
- * @return an iterator over the columns and the {@link IndexUpdate} that should be passed back to
- * the builder. Even if no update is necessary for the requested columns, you still need
- * to return the {@link IndexUpdate}, just don't set the update for the
- * {@link IndexUpdate}.
- * @throws IOException
+ * Get a getter interface for the state of the index row
*/
- Pair<Scanner, IndexUpdate> getIndexedColumnsTableState(
+ Pair<ValueGetter, IndexUpdate> getIndexUpdateState(
Collection<? extends ColumnReference> indexedColumns) throws IOException;
/**
@@ -112,5 +79,7 @@ public interface TableState {
* Can be used to help the codec to determine which columns it should attempt to index.
* @return the keyvalues in the pending update to the table.
*/
- Collection<KeyValue> getPendingUpdate();
+ Collection<Cell> getPendingUpdate();
+
+ Map<String,Object> getContext();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java
new file mode 100644
index 0000000..d90edc1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import co.cask.tephra.Transaction;
+import co.cask.tephra.hbase98.TransactionAwareHTable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
+import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class TxIndexBuilder extends BaseIndexBuilder {
+ private static final Log LOG = LogFactory.getLog(TxIndexBuilder.class);
+ public static final String TRANSACTION = "TRANSACTION";
+ private TransactionAwareHTable txTable;
+
+ @Override
+ public void setup(RegionCoprocessorEnvironment env) throws IOException {
+ super.setup(env);
+ HTableInterface htable = env.getTable(env.getRegion().getRegionInfo().getTable());
+ this.txTable = new TransactionAwareHTable(htable); // TODO: close?
+ }
+
+ @Override
+ public void stop(String why) {
+ try {
+ if (this.txTable != null) txTable.close();
+ } catch (IOException e) {
+ LOG.warn("Unable to close txTable", e);
+ } finally {
+ super.stop(why);
+ }
+ }
+
+ @Override
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException {
+ // get the index updates for this current batch
+ TxTableState state = new TxTableState(mutation);
+ codec.setContext(state, mutation);
+ Transaction tx = (Transaction)state.getContext().get(TRANSACTION);
+ state.setCurrentTransaction(tx);
+ Collection<Pair<Mutation, byte[]>> indexUpdates = Lists.newArrayListWithExpectedSize(2);
+ Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state);
+ for (IndexUpdate delete : deletes) {
+ indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName()));
+ }
+ state.addPendingUpdates(mutation);
+ // TODO: state may need to maintain the old state with the new state super imposed
+ // An alternate easier way would be to calculate the state after the data mutations
+ // have been applied.
+ Iterable<IndexUpdate> updates = codec.getIndexUpserts(state);
+ for (IndexUpdate update : updates) {
+ indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName()));
+ }
+ return indexUpdates;
+ }
+
+ private class TxTableState implements TableState {
+ private Put put;
+ private Map<String, byte[]> attributes;
+ private List<Cell> pendingUpdates = Lists.newArrayList();
+ private Transaction transaction;
+ private final Map<String,Object> context = Maps.newHashMap();
+
+ public TxTableState(Mutation m) {
+ this.put = new Put(m.getRow());
+ this.attributes = m.getAttributesMap();
+ }
+
+ @Override
+ public RegionCoprocessorEnvironment getEnvironment() {
+ return env;
+ }
+
+ @Override
+ public long getCurrentTimestamp() {
+ return transaction.getReadPointer();
+ }
+
+ @Override
+ public Map<String, byte[]> getUpdateAttributes() {
+ return attributes;
+ }
+
+ @Override
+ public byte[] getCurrentRowKey() {
+ return put.getRow();
+ }
+
+ @Override
+ public List<? extends IndexedColumnGroup> getIndexColumnHints() {
+ return Collections.emptyList();
+ }
+
+ public void addPendingUpdate(Cell cell) throws IOException {
+ put.add(cell);
+ }
+
+ public void addPendingUpdates(Mutation m) throws IOException {
+ if (m instanceof Delete) {
+ put.getFamilyCellMap().clear();
+ } else {
+ CellScanner scanner = m.cellScanner();
+ while (scanner.advance()) {
+ Cell cell = scanner.current();
+ if (cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
+ byte[] family = CellUtil.cloneFamily(cell);
+ byte[] qualifier = CellUtil.cloneQualifier(cell);
+ put.add(family, qualifier, HConstants.EMPTY_BYTE_ARRAY);
+ } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
+ byte[] family = CellUtil.cloneFamily(cell);
+ put.getFamilyCellMap().remove(family);
+ } else {
+ put.add(cell);
+ }
+ }
+ }
+ }
+
+ @Override
+ public Collection<Cell> getPendingUpdate() {
+ return pendingUpdates;
+ }
+
+ public void setCurrentTransaction(Transaction tx) {
+ this.transaction = tx;
+ }
+
+ @Override
+ public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns)
+ throws IOException {
+ ColumnTracker tracker = new ColumnTracker(indexedColumns);
+ IndexUpdate indexUpdate = new IndexUpdate(tracker);
+ final byte[] rowKey = getCurrentRowKey();
+ if (!pendingUpdates.isEmpty()) {
+ final Map<ColumnReference, ImmutableBytesPtr> valueMap = Maps.newHashMapWithExpectedSize(pendingUpdates
+ .size());
+ for (Cell kv : pendingUpdates) {
+ // create new pointers to each part of the kv
+ ImmutableBytesPtr value = new ImmutableBytesPtr(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+ valueMap.put(new ColumnReference(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()), value);
+ }
+ ValueGetter getter = new ValueGetter() {
+ @Override
+ public ImmutableBytesWritable getLatestValue(ColumnReference ref) {
+ // TODO: from IndexMaintainer we return null if ref is empty key value. Needed?
+ return valueMap.get(ref);
+ }
+ @Override
+ public byte[] getRowKey() {
+ return rowKey;
+ }
+ };
+ return new Pair<ValueGetter, IndexUpdate>(getter, indexUpdate);
+ }
+ // Establish initial state of table by finding the old values
+ // We'll apply the Mutation to this next
+ Get get = new Get(rowKey);
+ get.setMaxVersions();
+ for (ColumnReference ref : indexedColumns) {
+ get.addColumn(ref.getFamily(), ref.getQualifier());
+ }
+ txTable.startTx(transaction);
+ final Result result = txTable.get(get);
+ ValueGetter getter = new ValueGetter() {
+
+ @Override
+ public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException {
+ Cell cell = result.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
+ if (cell == null) {
+ return null;
+ }
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ return ptr;
+ }
+
+ @Override
+ public byte[] getRowKey() {
+ return rowKey;
+ }
+
+ };
+ for (ColumnReference ref : indexedColumns) {
+ Cell cell = result.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
+ if (cell != null) {
+ addPendingUpdate(cell);
+ }
+ }
+ return new Pair<ValueGetter, IndexUpdate>(getter, indexUpdate);
+ }
+
+ @Override
+ public Map<String, Object> getContext() {
+ return context;
+ }
+
+ }
+
+ @Override
+ protected boolean useRawScanToPrimeBlockCache() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
index 96a7410..52076a2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
@@ -22,8 +22,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
-
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.scanner.Scanner;
@@ -36,7 +38,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
public class LazyValueGetter implements ValueGetter {
private Scanner scan;
- private volatile Map<ColumnReference, ImmutableBytesPtr> values;
+ private volatile Map<ColumnReference, ImmutableBytesWritable> values;
private byte[] row;
/**
@@ -50,16 +52,16 @@ public class LazyValueGetter implements ValueGetter {
}
@Override
- public ImmutableBytesPtr getLatestValue(ColumnReference ref) throws IOException {
+ public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException {
// ensure we have a backing map
if (values == null) {
synchronized (this) {
- values = Collections.synchronizedMap(new HashMap<ColumnReference, ImmutableBytesPtr>());
+ values = Collections.synchronizedMap(new HashMap<ColumnReference, ImmutableBytesWritable>());
}
}
// check the value in the map
- ImmutableBytesPtr value = values.get(ref);
+ ImmutableBytesWritable value = values.get(ref);
if (value == null) {
value = get(ref);
values.put(ref, value);
@@ -78,9 +80,9 @@ public class LazyValueGetter implements ValueGetter {
return null;
}
// there is a next value - we only care about the current value, so we can just snag that
- KeyValue next = scan.next();
- if (ref.matches(next)) {
- return new ImmutableBytesPtr(next.getBuffer(), next.getValueOffset(), next.getValueLength());
+ Cell next = scan.next();
+ if (ref.matches(KeyValueUtil.ensureKeyValue(next))) {
+ return new ImmutableBytesPtr(next.getValueArray(), next.getValueOffset(), next.getValueLength());
}
return null;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
index aa209a5..d4bd460 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@ -1,19 +1,11 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
*/
package org.apache.phoenix.hbase.index.covered.example;
@@ -24,6 +16,7 @@ import java.util.List;
import java.util.Map.Entry;
import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
@@ -31,336 +24,343 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
-
-import com.google.common.collect.Lists;
import org.apache.phoenix.hbase.index.covered.IndexUpdate;
+import org.apache.phoenix.hbase.index.covered.LocalTableState;
import org.apache.phoenix.hbase.index.covered.TableState;
import org.apache.phoenix.hbase.index.scanner.Scanner;
import org.apache.phoenix.index.BaseIndexCodec;
+import com.google.common.collect.Lists;
+
/**
*
*/
public class CoveredColumnIndexCodec extends BaseIndexCodec {
- private static final byte[] EMPTY_BYTES = new byte[0];
- public static final byte[] INDEX_ROW_COLUMN_FAMILY = Bytes.toBytes("INDEXED_COLUMNS");
-
- private List<ColumnGroup> groups;
-
- /**
- * @param groups to initialize the codec with
- * @return an instance that is initialized with the given {@link ColumnGroup}s, for testing
- * purposes
- */
- public static CoveredColumnIndexCodec getCodecForTesting(List<ColumnGroup> groups) {
- CoveredColumnIndexCodec codec = new CoveredColumnIndexCodec();
- codec.groups = Lists.newArrayList(groups);
- return codec;
- }
-
- @Override
- public void initialize(RegionCoprocessorEnvironment env) {
- groups = CoveredColumnIndexSpecifierBuilder.getColumns(env.getConfiguration());
- }
-
- @Override
- public Iterable<IndexUpdate> getIndexUpserts(TableState state) {
- List<IndexUpdate> updates = new ArrayList<IndexUpdate>();
- for (ColumnGroup group : groups) {
- IndexUpdate update = getIndexUpdateForGroup(group, state);
- updates.add(update);
- }
- return updates;
- }
-
- /**
- * @param group
- * @param state
- * @return the update that should be made to the table
- */
- private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState state) {
- List<CoveredColumn> refs = group.getColumns();
- try {
- Pair<Scanner, IndexUpdate> stateInfo = state.getIndexedColumnsTableState(refs);
- Scanner kvs = stateInfo.getFirst();
- Pair<Integer, List<ColumnEntry>> columns =
- getNextEntries(refs, kvs, state.getCurrentRowKey());
- // make sure we close the scanner
- kvs.close();
- if (columns.getFirst().intValue() == 0) {
- return stateInfo.getSecond();
- }
- // have all the column entries, so just turn it into a Delete for the row
- // convert the entries to the needed values
- byte[] rowKey =
- composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond());
- Put p = new Put(rowKey, state.getCurrentTimestamp());
- // add the columns to the put
- addColumnsToPut(p, columns.getSecond());
-
- // update the index info
- IndexUpdate update = stateInfo.getSecond();
- update.setTable(Bytes.toBytes(group.getTable()));
- update.setUpdate(p);
- return update;
- } catch (IOException e) {
- throw new RuntimeException("Unexpected exception when getting state for columns: " + refs);
- }
- }
-
- private static void addColumnsToPut(Put indexInsert, List<ColumnEntry> columns) {
- // add each of the corresponding families to the put
- int count = 0;
- for (ColumnEntry column : columns) {
- indexInsert.add(INDEX_ROW_COLUMN_FAMILY,
- ArrayUtils.addAll(Bytes.toBytes(count++), toIndexQualifier(column.ref)), null);
+ private static final byte[] EMPTY_BYTES = new byte[0];
+ public static final byte[] INDEX_ROW_COLUMN_FAMILY = Bytes.toBytes("INDEXED_COLUMNS");
+
+ private List<ColumnGroup> groups;
+
+ /**
+ * @param groups
+ * to initialize the codec with
+ * @return an instance that is initialized with the given {@link ColumnGroup}s, for testing purposes
+ */
+ public static CoveredColumnIndexCodec getCodecForTesting(List<ColumnGroup> groups) {
+ CoveredColumnIndexCodec codec = new CoveredColumnIndexCodec();
+ codec.groups = Lists.newArrayList(groups);
+ return codec;
}
- }
-
- private static byte[] toIndexQualifier(CoveredColumn column) {
- return ArrayUtils.addAll(Bytes.toBytes(column.familyString + CoveredColumn.SEPARATOR),
- column.getQualifier());
- }
-
- @Override
- public Iterable<IndexUpdate> getIndexDeletes(TableState state) {
- List<IndexUpdate> deletes = new ArrayList<IndexUpdate>();
- for (ColumnGroup group : groups) {
- deletes.add(getDeleteForGroup(group, state));
+
+ @Override
+ public void initialize(RegionCoprocessorEnvironment env) {
+ groups = CoveredColumnIndexSpecifierBuilder.getColumns(env.getConfiguration());
}
- return deletes;
- }
-
-
- /**
- * Get all the deletes necessary for a group of columns - logically, the cleanup the index table
- * for a given index.
- * @param group index information
- * @return the cleanup for the given index, or <tt>null</tt> if no cleanup is necessary
- */
- private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state) {
- List<CoveredColumn> refs = group.getColumns();
- try {
- Pair<Scanner, IndexUpdate> kvs = state.getIndexedColumnsTableState(refs);
- Pair<Integer, List<ColumnEntry>> columns =
- getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey());
- // make sure we close the scanner reference
- kvs.getFirst().close();
- // no change, just return the passed update
- if (columns.getFirst() == 0) {
- return kvs.getSecond();
- }
- // have all the column entries, so just turn it into a Delete for the row
- // convert the entries to the needed values
- byte[] rowKey =
- composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond());
- Delete d = new Delete(rowKey);
- d.setTimestamp(state.getCurrentTimestamp());
- IndexUpdate update = kvs.getSecond();
- update.setUpdate(d);
- update.setTable(Bytes.toBytes(group.getTable()));
- return update;
- } catch (IOException e) {
- throw new RuntimeException("Unexpected exception when getting state for columns: " + refs);
+
+ @Override
+ public Iterable<IndexUpdate> getIndexUpserts(TableState state) {
+ List<IndexUpdate> updates = new ArrayList<IndexUpdate>();
+ for (ColumnGroup group : groups) {
+ IndexUpdate update = getIndexUpdateForGroup(group, state);
+ updates.add(update);
+ }
+ return updates;
}
- }
-
- /**
- * Get the next batch of primary table values for the given columns
- * @param refs columns to match against
- * @param state
- * @return the total length of all values found and the entries to add for the index
- */
- private Pair<Integer, List<ColumnEntry>> getNextEntries(List<CoveredColumn> refs, Scanner kvs,
- byte[] currentRow) throws IOException {
- int totalValueLength = 0;
- List<ColumnEntry> entries = new ArrayList<ColumnEntry>(refs.size());
-
- // pull out the latest state for each column reference, in order
- for (CoveredColumn ref : refs) {
- KeyValue first = ref.getFirstKeyValueForRow(currentRow);
- if (!kvs.seek(first)) {
- // no more keys, so add a null value
- entries.add(new ColumnEntry(null, ref));
- continue;
- }
- // there is a next value - we only care about the current value, so we can just snag that
- KeyValue next = kvs.next();
- if (ref.matchesFamily(next.getFamily()) && ref.matchesQualifier(next.getQualifier())) {
- byte[] v = next.getValue();
- totalValueLength += v.length;
- entries.add(new ColumnEntry(v, ref));
- } else {
- // this first one didn't match at all, so we have to put in a null entry
- entries.add(new ColumnEntry(null, ref));
- continue;
- }
- // here's where is gets a little tricky - we either need to decide if we should continue
- // adding entries (matches all qualifiers) or if we are done (matches a single qualifier)
- if (!ref.allColumns()) {
- continue;
- }
- // matches all columns, so we need to iterate until we hit the next column with the same
- // family as the current key
- byte[] lastQual = next.getQualifier();
- byte[] nextQual = null;
- while ((next = kvs.next()) != null) {
- // different family, done with this column
- if (!ref.matchesFamily(next.getFamily())) {
- break;
+
+ /**
+ * @param group
+ * @param state
+ * @return the update that should be made to the table
+ */
+ private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState state) {
+ List<CoveredColumn> refs = group.getColumns();
+ try {
+ Pair<Scanner, IndexUpdate> stateInfo = ((LocalTableState)state).getIndexedColumnsTableState(refs);
+ Scanner kvs = stateInfo.getFirst();
+ Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs, state.getCurrentRowKey());
+ // make sure we close the scanner
+ kvs.close();
+ if (columns.getFirst().intValue() == 0) { return stateInfo.getSecond(); }
+ // have all the column entries, so just turn it into a Delete for the row
+ // convert the entries to the needed values
+ byte[] rowKey = composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond());
+ Put p = new Put(rowKey, state.getCurrentTimestamp());
+ // add the columns to the put
+ addColumnsToPut(p, columns.getSecond());
+
+ // update the index info
+ IndexUpdate update = stateInfo.getSecond();
+ update.setTable(Bytes.toBytes(group.getTable()));
+ update.setUpdate(p);
+ return update;
+ } catch (IOException e) {
+ throw new RuntimeException("Unexpected exception when getting state for columns: " + refs);
}
- nextQual = next.getQualifier();
- // we are still on the same qualifier - skip it, since we already added a column for it
- if (Arrays.equals(lastQual, nextQual)) {
- continue;
+ }
+
+ private static void addColumnsToPut(Put indexInsert, List<ColumnEntry> columns) {
+ // add each of the corresponding families to the put
+ int count = 0;
+ for (ColumnEntry column : columns) {
+ indexInsert.add(INDEX_ROW_COLUMN_FAMILY,
+ ArrayUtils.addAll(Bytes.toBytes(count++), toIndexQualifier(column.ref)), null);
}
- // this must match the qualifier since its an all-qualifiers specifier, so we add it
- byte[] v = next.getValue();
- totalValueLength += v.length;
- entries.add(new ColumnEntry(v, ref));
- // update the last qualifier to check against
- lastQual = nextQual;
- }
}
- return new Pair<Integer, List<ColumnEntry>>(totalValueLength, entries);
- }
- static class ColumnEntry {
- byte[] value = EMPTY_BYTES;
- CoveredColumn ref;
+ private static byte[] toIndexQualifier(CoveredColumn column) {
+ return ArrayUtils.addAll(Bytes.toBytes(column.familyString + CoveredColumn.SEPARATOR), column.getQualifier());
+ }
- public ColumnEntry(byte[] value, CoveredColumn ref) {
- this.value = value == null ? EMPTY_BYTES : value;
- this.ref = ref;
+ @Override
+ public Iterable<IndexUpdate> getIndexDeletes(TableState state) {
+ List<IndexUpdate> deletes = new ArrayList<IndexUpdate>();
+ for (ColumnGroup group : groups) {
+ deletes.add(getDeleteForGroup(group, state));
+ }
+ return deletes;
+ }
+
+ /**
+ * Get all the deletes necessary for a group of columns - logically, the cleanup the index table for a given index.
+ *
+ * @param group
+ * index information
+ * @return the cleanup for the given index, or <tt>null</tt> if no cleanup is necessary
+ */
+ private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state) {
+ List<CoveredColumn> refs = group.getColumns();
+ try {
+ Pair<Scanner, IndexUpdate> kvs = ((LocalTableState)state).getIndexedColumnsTableState(refs);
+ Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey());
+ // make sure we close the scanner reference
+ kvs.getFirst().close();
+ // no change, just return the passed update
+ if (columns.getFirst() == 0) { return kvs.getSecond(); }
+ // have all the column entries, so just turn it into a Delete for the row
+ // convert the entries to the needed values
+ byte[] rowKey = composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond());
+ Delete d = new Delete(rowKey);
+ d.setTimestamp(state.getCurrentTimestamp());
+ IndexUpdate update = kvs.getSecond();
+ update.setUpdate(d);
+ update.setTable(Bytes.toBytes(group.getTable()));
+ return update;
+ } catch (IOException e) {
+ throw new RuntimeException("Unexpected exception when getting state for columns: " + refs);
+ }
}
- }
-
- /**
- * Compose the final index row key.
- * <p>
- * This is faster than adding each value independently as we can just build a single a array and
- * copy everything over once.
- * @param pk primary key of the original row
- * @param length total number of bytes of all the values that should be added
- * @param values to use when building the key
- */
- static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> values) {
- // now build up expected row key, each of the values, in order, followed by the PK and then some
- // info about lengths so we can deserialize each value
- byte[] output = new byte[length + pk.length];
- int pos = 0;
- int[] lengths = new int[values.size()];
- int i = 0;
- for (ColumnEntry entry : values) {
- byte[] v = entry.value;
- // skip doing the copy attempt, if we don't need to
- if (v.length != 0) {
- System.arraycopy(v, 0, output, pos, v.length);
- pos += v.length;
- }
- lengths[i++] = v.length;
+
+ /**
+ * Get the next batch of primary table values for the given columns
+ *
+ * @param refs
+ * columns to match against
+ * @param state
+ * @return the total length of all values found and the entries to add for the index
+ */
+ private Pair<Integer, List<ColumnEntry>> getNextEntries(List<CoveredColumn> refs, Scanner kvs, byte[] currentRow)
+ throws IOException {
+ int totalValueLength = 0;
+ List<ColumnEntry> entries = new ArrayList<ColumnEntry>(refs.size());
+
+ // pull out the latest state for each column reference, in order
+ for (CoveredColumn ref : refs) {
+ KeyValue first = ref.getFirstKeyValueForRow(currentRow);
+ if (!kvs.seek(first)) {
+ // no more keys, so add a null value
+ entries.add(new ColumnEntry(null, ref));
+ continue;
+ }
+ // there is a next value - we only care about the current value, so we can just snag that
+ Cell next = kvs.next();
+ if (ref.matchesFamily(next.getFamily()) && ref.matchesQualifier(next.getQualifier())) {
+ byte[] v = next.getValue();
+ totalValueLength += v.length;
+ entries.add(new ColumnEntry(v, ref));
+ } else {
+ // this first one didn't match at all, so we have to put in a null entry
+ entries.add(new ColumnEntry(null, ref));
+ continue;
+ }
+ // here's where is gets a little tricky - we either need to decide if we should continue
+ // adding entries (matches all qualifiers) or if we are done (matches a single qualifier)
+ if (!ref.allColumns()) {
+ continue;
+ }
+ // matches all columns, so we need to iterate until we hit the next column with the same
+ // family as the current key
+ byte[] lastQual = next.getQualifier();
+ byte[] nextQual = null;
+ while ((next = kvs.next()) != null) {
+ // different family, done with this column
+ if (!ref.matchesFamily(next.getFamily())) {
+ break;
+ }
+ nextQual = next.getQualifier();
+ // we are still on the same qualifier - skip it, since we already added a column for it
+ if (Arrays.equals(lastQual, nextQual)) {
+ continue;
+ }
+ // this must match the qualifier since its an all-qualifiers specifier, so we add it
+ byte[] v = next.getValue();
+ totalValueLength += v.length;
+ entries.add(new ColumnEntry(v, ref));
+ // update the last qualifier to check against
+ lastQual = nextQual;
+ }
+ }
+ return new Pair<Integer, List<ColumnEntry>>(totalValueLength, entries);
}
- // add the primary key to the end of the row key
- System.arraycopy(pk, 0, output, pos, pk.length);
+ static class ColumnEntry {
+ byte[] value = EMPTY_BYTES;
+ CoveredColumn ref;
- // add the lengths as suffixes so we can deserialize the elements again
- for (int l : lengths) {
- output = ArrayUtils.addAll(output, Bytes.toBytes(l));
+ public ColumnEntry(byte[] value, CoveredColumn ref) {
+ this.value = value == null ? EMPTY_BYTES : value;
+ this.ref = ref;
+ }
}
- // and the last integer is the number of values
- return ArrayUtils.addAll(output, Bytes.toBytes(values.size()));
- }
-
- /**
- * Essentially a short-cut from building a {@link Put}.
- * @param pk row key
- * @param timestamp timestamp of all the keyvalues
- * @param values expected value--column pair
- * @return a keyvalues that the index contains for a given row at a timestamp with the given value
- * -- column pairs.
- */
- public static List<KeyValue> getIndexKeyValueForTesting(byte[] pk, long timestamp,
- List<Pair<byte[], CoveredColumn>> values) {
-
- int length = 0;
- List<ColumnEntry> expected = new ArrayList<ColumnEntry>(values.size());
- for (Pair<byte[], CoveredColumn> value : values) {
- ColumnEntry entry = new ColumnEntry(value.getFirst(), value.getSecond());
- length += value.getFirst().length;
- expected.add(entry);
+ /**
+ * Compose the final index row key.
+ * <p>
+ * This is faster than adding each value independently as we can just build a single a array and copy everything
+ * over once.
+ *
+ * @param pk
+ * primary key of the original row
+ * @param length
+ * total number of bytes of all the values that should be added
+ * @param values
+ * to use when building the key
+ */
+ static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> values) {
+ // now build up expected row key, each of the values, in order, followed by the PK and then some
+ // info about lengths so we can deserialize each value
+ byte[] output = new byte[length + pk.length];
+ int pos = 0;
+ int[] lengths = new int[values.size()];
+ int i = 0;
+ for (ColumnEntry entry : values) {
+ byte[] v = entry.value;
+ // skip doing the copy attempt, if we don't need to
+ if (v.length != 0) {
+ System.arraycopy(v, 0, output, pos, v.length);
+ pos += v.length;
+ }
+ lengths[i++] = v.length;
+ }
+
+ // add the primary key to the end of the row key
+ System.arraycopy(pk, 0, output, pos, pk.length);
+
+ // add the lengths as suffixes so we can deserialize the elements again
+ for (int l : lengths) {
+ output = ArrayUtils.addAll(output, Bytes.toBytes(l));
+ }
+
+ // and the last integer is the number of values
+ return ArrayUtils.addAll(output, Bytes.toBytes(values.size()));
}
-
- byte[] rowKey = CoveredColumnIndexCodec.composeRowKey(pk, length, expected);
- Put p = new Put(rowKey, timestamp);
- CoveredColumnIndexCodec.addColumnsToPut(p, expected);
- List<KeyValue> kvs = new ArrayList<KeyValue>();
- for (Entry<byte[], List<KeyValue>> entry : p.getFamilyMap().entrySet()) {
- kvs.addAll(entry.getValue());
+
+ /**
+ * Essentially a short-cut from building a {@link Put}.
+ *
+ * @param pk
+ * row key
+ * @param timestamp
+ * timestamp of all the keyvalues
+ * @param values
+ * expected value--column pair
+ * @return a keyvalues that the index contains for a given row at a timestamp with the given value -- column pairs.
+ */
+ public static List<KeyValue> getIndexKeyValueForTesting(byte[] pk, long timestamp,
+ List<Pair<byte[], CoveredColumn>> values) {
+
+ int length = 0;
+ List<ColumnEntry> expected = new ArrayList<ColumnEntry>(values.size());
+ for (Pair<byte[], CoveredColumn> value : values) {
+ ColumnEntry entry = new ColumnEntry(value.getFirst(), value.getSecond());
+ length += value.getFirst().length;
+ expected.add(entry);
+ }
+
+ byte[] rowKey = CoveredColumnIndexCodec.composeRowKey(pk, length, expected);
+ Put p = new Put(rowKey, timestamp);
+ CoveredColumnIndexCodec.addColumnsToPut(p, expected);
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+ for (Entry<byte[], List<KeyValue>> entry : p.getFamilyMap().entrySet()) {
+ kvs.addAll(entry.getValue());
+ }
+
+ return kvs;
}
-
- return kvs;
- }
-
- public static List<byte[]> getValues(byte[] bytes) {
- // get the total number of keys in the bytes
- int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, bytes.length);
- List<byte[]> keys = new ArrayList<byte[]>(keyCount);
- int[] lengths = new int[keyCount];
- int lengthPos = keyCount - 1;
- int pos = bytes.length - Bytes.SIZEOF_INT;
- // figure out the length of each key
- for (int i = 0; i < keyCount; i++) {
- lengths[lengthPos--] = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
- pos -= Bytes.SIZEOF_INT;
+
+ public static List<byte[]> getValues(byte[] bytes) {
+ // get the total number of keys in the bytes
+ int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, bytes.length);
+ List<byte[]> keys = new ArrayList<byte[]>(keyCount);
+ int[] lengths = new int[keyCount];
+ int lengthPos = keyCount - 1;
+ int pos = bytes.length - Bytes.SIZEOF_INT;
+ // figure out the length of each key
+ for (int i = 0; i < keyCount; i++) {
+ lengths[lengthPos--] = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
+ pos -= Bytes.SIZEOF_INT;
+ }
+
+ int current = 0;
+ for (int length : lengths) {
+ byte[] key = Arrays.copyOfRange(bytes, current, current + length);
+ keys.add(key);
+ current += length;
+ }
+
+ return keys;
}
- int current = 0;
- for (int length : lengths) {
- byte[] key = Arrays.copyOfRange(bytes, current, current + length);
- keys.add(key);
- current += length;
+ /**
+ * Read an integer from the preceding {@value Bytes#SIZEOF_INT} bytes
+ *
+ * @param bytes
+ * array to read from
+ * @param start
+ * start point, backwards from which to read. For example, if specifying "25", we would try to read an
+ * integer from 21 -> 25
+ * @return an integer from the proceeding {@value Bytes#SIZEOF_INT} bytes, if it exists.
+ */
+ private static int getPreviousInteger(byte[] bytes, int start) {
+ return Bytes.toInt(bytes, start - Bytes.SIZEOF_INT);
}
- return keys;
- }
-
- /**
- * Read an integer from the preceding {@value Bytes#SIZEOF_INT} bytes
- * @param bytes array to read from
- * @param start start point, backwards from which to read. For example, if specifying "25", we
- * would try to read an integer from 21 -> 25
- * @return an integer from the proceeding {@value Bytes#SIZEOF_INT} bytes, if it exists.
- */
- private static int getPreviousInteger(byte[] bytes, int start) {
- return Bytes.toInt(bytes, start - Bytes.SIZEOF_INT);
- }
-
- /**
- * Check to see if an row key just contains a list of null values.
- * @param bytes row key to examine
- * @return <tt>true</tt> if all the values are zero-length, <tt>false</tt> otherwise
- */
- public static boolean checkRowKeyForAllNulls(byte[] bytes) {
- int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, bytes.length);
- int pos = bytes.length - Bytes.SIZEOF_INT;
- for (int i = 0; i < keyCount; i++) {
- int next = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
- if (next > 0) {
- return false;
- }
- pos -= Bytes.SIZEOF_INT;
+ /**
+ * Check to see if an row key just contains a list of null values.
+ *
+ * @param bytes
+ * row key to examine
+ * @return <tt>true</tt> if all the values are zero-length, <tt>false</tt> otherwise
+ */
+ public static boolean checkRowKeyForAllNulls(byte[] bytes) {
+ int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, bytes.length);
+ int pos = bytes.length - Bytes.SIZEOF_INT;
+ for (int i = 0; i < keyCount; i++) {
+ int next = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
+ if (next > 0) { return false; }
+ pos -= Bytes.SIZEOF_INT;
+ }
+
+ return true;
}
- return true;
- }
+ @Override
+ public boolean isEnabled(Mutation m) {
+ // this could be a bit smarter, looking at the groups for the mutation, but we leave it at this
+ // simple check for the moment.
+ return groups.size() > 0;
+ }
- @Override
- public boolean isEnabled(Mutation m) {
- // this could be a bit smarter, looking at the groups for the mutation, but we leave it at this
- // simple check for the moment.
- return groups.size() > 0;
- }
+ @Override
+ public void setContext(TableState state, Mutation mutation) throws IOException {}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
index 6ac89d1..48c714d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.phoenix.hbase.index.Indexer;
-import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
import org.apache.phoenix.hbase.index.covered.IndexCodec;
/**
@@ -136,7 +136,7 @@ public class CoveredColumnIndexSpecifierBuilder {
void build(HTableDescriptor desc, Class<? extends IndexCodec> clazz) throws IOException {
// add the codec for the index to the map of options
Map<String, String> opts = this.convertToMap();
- opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, clazz.getName());
+ opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, clazz.getName());
Indexer.enableIndexing(desc, CoveredColumnIndexer.class, opts, Coprocessor.PRIORITY_USER);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
index f80cf41..de8f752 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
@@ -35,8 +36,8 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.hbase.index.covered.Batch;
-import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
import org.apache.phoenix.hbase.index.covered.LocalTableState;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
/**
@@ -90,7 +91,7 @@ import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
* <b>NOTE:</b> this means that we need to do a lookup (point {@link Get}) of the entire row
* <i>every time there is a write to the table</i>.
*/
-public class CoveredColumnIndexer extends CoveredColumnsIndexBuilder {
+public class CoveredColumnIndexer extends NonTxIndexBuilder {
/**
* Create the specified index table with the necessary columns
@@ -125,9 +126,9 @@ public class CoveredColumnIndexer extends CoveredColumnsIndexBuilder {
Collection<Batch> batches = batchByRow(filtered);
for (Batch batch : batches) {
- KeyValue curKV = batch.getKvs().iterator().next();
+ Cell curKV = batch.getKvs().iterator().next();
Put p = new Put(curKV.getRowArray(), curKV.getRowOffset(), curKV.getRowLength());
- for (KeyValue kv : batch.getKvs()) {
+ for (Cell kv : batch.getKvs()) {
// we only need to cleanup Put entries
byte type = kv.getTypeByte();
Type t = KeyValue.Type.codeToType(type);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java
index b9f3858..7c69493 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.hbase.index.covered.update;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
@@ -45,7 +44,7 @@ public class ColumnTracker implements IndexedColumnGroup {
public ColumnTracker(Collection<? extends ColumnReference> columns) {
this.columns = new ArrayList<ColumnReference>(columns);
// sort the columns
- Collections.sort(this.columns);
+ // no need to do this: Collections.sort(this.columns);
this.hashCode = calcHashCode(this.columns);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
index d09498a..26f620f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
@@ -30,12 +30,11 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-
/**
* Keeps track of the index updates
*/
@@ -182,8 +181,6 @@ public class IndexUpdateManager {
for (Entry<ImmutableBytesPtr, Collection<Mutation>> updates : map.entrySet()) {
// get is ok because we always set with just the bytes
byte[] tableName = updates.getKey().get();
- // TODO replace this as just storing a byte[], to avoid all the String <-> byte[] swapping
- // HBase does
for (Mutation m : updates.getValue()) {
// skip elements that have been marked for delete
if (shouldBeRemoved(m)) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
index a28268c..884cca6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
@@ -20,7 +20,7 @@ package org.apache.phoenix.hbase.index.scanner;
import java.io.IOException;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
/**
@@ -29,17 +29,17 @@ import org.apache.hadoop.hbase.KeyValue;
public class EmptyScanner implements Scanner {
@Override
- public KeyValue next() throws IOException {
+ public Cell next() throws IOException {
return null;
}
@Override
- public boolean seek(KeyValue next) throws IOException {
+ public boolean seek(Cell next) throws IOException {
return false;
}
@Override
- public KeyValue peek() throws IOException {
+ public Cell peek() throws IOException {
return null;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
index 868e892..9454de5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
@@ -21,6 +21,7 @@ package org.apache.phoenix.hbase.index.scanner;
import java.io.Closeable;
import java.io.IOException;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
/**
@@ -33,7 +34,7 @@ public interface Scanner extends Closeable {
* @return the next keyvalue in the scanner or <tt>null</tt> if there is no next {@link KeyValue}
* @throws IOException if there is an underlying error reading the data
*/
- public KeyValue next() throws IOException;
+ public Cell next() throws IOException;
/**
* Seek to immediately before the given {@link KeyValue}. If that exact {@link KeyValue} is
@@ -43,7 +44,7 @@ public interface Scanner extends Closeable {
* @return <tt>true</tt> if there are values left in <tt>this</tt>, <tt>false</tt> otherwise
* @throws IOException if there is an error reading the underlying data.
*/
- public boolean seek(KeyValue next) throws IOException;
+ public boolean seek(Cell next) throws IOException;
/**
* Read the {@link KeyValue} at the top of <tt>this</tt> without 'popping' it off the top of the
@@ -51,5 +52,5 @@ public interface Scanner extends Closeable {
* @return the next {@link KeyValue} or <tt>null</tt> if there are no more values in <tt>this</tt>
* @throws IOException if there is an error reading the underlying data.
*/
- public KeyValue peek() throws IOException;
+ public Cell peek() throws IOException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
index 32e4d84..362ef18 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
@@ -23,7 +23,9 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@@ -126,7 +128,7 @@ public class ScannerBuilder {
return new Scanner() {
@Override
- public KeyValue next() {
+ public Cell next() {
try {
return kvScanner.next();
} catch (IOException e) {
@@ -135,7 +137,7 @@ public class ScannerBuilder {
}
@Override
- public boolean seek(KeyValue next) throws IOException {
+ public boolean seek(Cell next) throws IOException {
// check to see if the next kv is after the current key, in which case we can use reseek,
// which will be more efficient
KeyValue peek = kvScanner.peek();
@@ -143,17 +145,17 @@ public class ScannerBuilder {
if (peek != null) {
int compare = KeyValue.COMPARATOR.compare(peek, next);
if (compare < 0) {
- return kvScanner.reseek(next);
+ return kvScanner.reseek(KeyValueUtil.ensureKeyValue(next));
} else if (compare == 0) {
// we are already at the given key!
return true;
}
}
- return kvScanner.seek(next);
+ return kvScanner.seek(KeyValueUtil.ensureKeyValue(next));
}
@Override
- public KeyValue peek() throws IOException {
+ public Cell peek() throws IOException {
return kvScanner.peek();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
index 5964647..3335aaa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
@@ -22,16 +22,14 @@ import java.io.IOException;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import com.google.common.collect.Multimap;
-import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
-
/**
* Handle failures to write to the index tables.
*/
public interface IndexFailurePolicy extends Stoppable {
-
public void setup(Stoppable parent, RegionCoprocessorEnvironment env);
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 4f785eb..3500dd2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -766,7 +766,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
int i = 0;
for (ColumnReference ref : this.getCoverededColumns()) {
ImmutableBytesPtr cq = this.indexQualifiers.get(i++);
- ImmutableBytesPtr value = valueGetter.getLatestValue(ref);
+ ImmutableBytesWritable value = valueGetter.getLatestValue(ref);
byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey);
ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey);
if (value != null) {
@@ -781,9 +781,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
return put;
}
- public boolean isRowDeleted(Collection<KeyValue> pendingUpdates) {
+ public boolean isRowDeleted(Collection<Cell> pendingUpdates) {
int nDeleteCF = 0;
- for (KeyValue kv : pendingUpdates) {
+ for (Cell kv : pendingUpdates) {
if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
nDeleteCF++;
boolean isEmptyCF = Bytes.compareTo(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(),
@@ -798,18 +798,18 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
return nDeleteCF == this.nDataCFs;
}
- private boolean hasIndexedColumnChanged(ValueGetter oldState, Collection<KeyValue> pendingUpdates) throws IOException {
+ private boolean hasIndexedColumnChanged(ValueGetter oldState, Collection<Cell> pendingUpdates) throws IOException {
if (pendingUpdates.isEmpty()) {
return false;
}
- Map<ColumnReference,KeyValue> newState = Maps.newHashMapWithExpectedSize(pendingUpdates.size());
- for (KeyValue kv : pendingUpdates) {
+ Map<ColumnReference,Cell> newState = Maps.newHashMapWithExpectedSize(pendingUpdates.size());
+ for (Cell kv : pendingUpdates) {
newState.put(new ColumnReference(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv)), kv);
}
for (ColumnReference ref : indexedColumns) {
- KeyValue newValue = newState.get(ref);
+ Cell newValue = newState.get(ref);
if (newValue != null) { // Indexed column has potentially changed
- ImmutableBytesPtr oldValue = oldState.getLatestValue(ref);
+ ImmutableBytesWritable oldValue = oldState.getLatestValue(ref);
boolean newValueSetAsNull = newValue.getTypeByte() == Type.DeleteColumn.getCode();
//If the new column value has to be set as null and the older value is null too,
//then just skip to the next indexed column.
@@ -834,11 +834,11 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
* since we can build the corresponding index row key.
*/
public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ImmutableBytesWritable dataRowKeyPtr, long ts) throws IOException {
- return buildDeleteMutation(kvBuilder, null, dataRowKeyPtr, Collections.<KeyValue>emptyList(), ts, null, null);
+ return buildDeleteMutation(kvBuilder, null, dataRowKeyPtr, Collections.<Cell>emptyList(), ts, null, null);
}
@SuppressWarnings("deprecation")
- public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<KeyValue> pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException {
+ public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<Cell> pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException {
byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, regionStartKey, regionEndKey);
// Delete the entire row if any of the indexed columns changed
if (oldState == null || isRowDeleted(pendingUpdates) || hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row
@@ -848,7 +848,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
}
Delete delete = null;
// Delete columns for missing key values
- for (KeyValue kv : pendingUpdates) {
+ for (Cell kv : pendingUpdates) {
if (kv.getTypeByte() != KeyValue.Type.Put.getCode()) {
ColumnReference ref = new ColumnReference(kv.getFamily(), kv.getQualifier());
if (coveredColumns.contains(ref)) {
@@ -1304,14 +1304,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
.size());
for (Cell kv : pendingUpdates) {
// create new pointers to each part of the kv
- ImmutableBytesPtr family = new ImmutableBytesPtr(kv.getRowArray(),kv.getFamilyOffset(),kv.getFamilyLength());
- ImmutableBytesPtr qual = new ImmutableBytesPtr(kv.getRowArray(), kv.getQualifierOffset(), kv.getQualifierLength());
ImmutableBytesPtr value = new ImmutableBytesPtr(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
- valueMap.put(new ColumnReference(kv.getRowArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getRowArray(), kv.getQualifierOffset(), kv.getQualifierLength()), value);
+ valueMap.put(new ColumnReference(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()), value);
}
return new ValueGetter() {
@Override
- public ImmutableBytesPtr getLatestValue(ColumnReference ref) {
+ public ImmutableBytesWritable getLatestValue(ColumnReference ref) {
if(ref.equals(dataEmptyKeyValueRef)) return null;
return valueMap.get(ref);
}
[3/3] phoenix git commit: Secondary indexing with txns
Posted by ja...@apache.org.
Secondary indexing with txns
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5a558e16
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5a558e16
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5a558e16
Branch: refs/heads/txn
Commit: 5a558e16cd7b1e882b274683aff6b655f952dac1
Parents: 826ebf5
Author: James Taylor <jt...@salesforce.com>
Authored: Fri Mar 27 10:35:37 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Mar 27 10:35:37 2015 -0700
----------------------------------------------------------------------
.../EndToEndCoveredColumnsIndexBuilderIT.java | 12 +-
.../covered/example/FailWithoutRetriesIT.java | 198 +++---
.../apache/phoenix/hbase/index/ValueGetter.java | 4 +-
.../hbase/index/builder/BaseIndexBuilder.java | 170 +++--
.../hbase/index/builder/IndexBuildManager.java | 14 -
.../hbase/index/builder/IndexBuilder.java | 47 +-
.../phoenix/hbase/index/covered/Batch.java | 11 +-
.../covered/CoveredColumnsIndexBuilder.java | 491 ---------------
.../phoenix/hbase/index/covered/IndexCodec.java | 172 +++--
.../hbase/index/covered/LocalTableState.java | 453 +++++++------
.../hbase/index/covered/NonTxIndexBuilder.java | 405 ++++++++++++
.../phoenix/hbase/index/covered/TableState.java | 45 +-
.../hbase/index/covered/TxIndexBuilder.java | 247 ++++++++
.../index/covered/data/LazyValueGetter.java | 18 +-
.../example/CoveredColumnIndexCodec.java | 628 +++++++++----------
.../CoveredColumnIndexSpecifierBuilder.java | 4 +-
.../covered/example/CoveredColumnIndexer.java | 9 +-
.../index/covered/update/ColumnTracker.java | 3 +-
.../covered/update/IndexUpdateManager.java | 5 +-
.../hbase/index/scanner/EmptyScanner.java | 8 +-
.../phoenix/hbase/index/scanner/Scanner.java | 7 +-
.../hbase/index/scanner/ScannerBuilder.java | 12 +-
.../hbase/index/write/IndexFailurePolicy.java | 4 +-
.../apache/phoenix/index/IndexMaintainer.java | 28 +-
.../phoenix/index/PhoenixIndexBuilder.java | 90 +--
.../apache/phoenix/index/PhoenixIndexCodec.java | 145 ++---
.../index/PhoenixIndexFailurePolicy.java | 2 +-
.../phoenix/index/PhoenixTxIndexBuilder.java | 53 ++
.../index/PhoenixTxIndexFailurePolicy.java | 50 ++
.../query/ConnectionQueryServicesImpl.java | 10 +-
.../phoenix/schema/tuple/ValueGetterTuple.java | 16 +-
.../java/org/apache/phoenix/util/IndexUtil.java | 64 +-
.../covered/CoveredIndexCodecForTesting.java | 93 ++-
.../index/covered/TestLocalTableState.java | 20 +-
.../example/TestCoveredColumnIndexCodec.java | 16 +-
.../phoenix/index/IndexMaintainerTest.java | 2 +-
36 files changed, 1904 insertions(+), 1652 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
index 84c6827..fa85f00 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
@@ -59,7 +59,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
- * End-to-End test of just the {@link CoveredColumnsIndexBuilder}, but with a simple
+ * End-to-End test of just the {@link NonTxIndexBuilder}, but with a simple
* {@link IndexCodec} and BatchCache implementation.
*/
@Category(NeedsOwnMiniClusterTest.class)
@@ -151,7 +151,7 @@ public class EndToEndCoveredColumnsIndexBuilderIT {
((LocalTableState) state).getIndexedColumnsTableState(Arrays.asList(columns)).getFirst();
int count = 0;
- KeyValue kv;
+ Cell kv;
while ((kv = kvs.next()) != null) {
Cell next = expectedKvs.get(count++);
assertEquals(
@@ -302,9 +302,9 @@ public class EndToEndCoveredColumnsIndexBuilderIT {
Map<String, String> indexerOpts = new HashMap<String, String>();
// just need to set the codec - we are going to set it later, but we need something here or the
// initializer blows up.
- indexerOpts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY,
+ indexerOpts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY,
CoveredIndexCodecForTesting.class.getName());
- Indexer.enableIndexing(desc, CoveredColumnsIndexBuilder.class, indexerOpts, Coprocessor.PRIORITY_USER);
+ Indexer.enableIndexing(desc, NonTxIndexBuilder.class, indexerOpts, Coprocessor.PRIORITY_USER);
// create the table
HBaseAdmin admin = UTIL.getHBaseAdmin();
@@ -315,8 +315,8 @@ public class EndToEndCoveredColumnsIndexBuilderIT {
HRegion region = UTIL.getMiniHBaseCluster().getRegions(tableNameBytes).get(0);
Indexer indexer =
(Indexer) region.getCoprocessorHost().findCoprocessor(Indexer.class.getName());
- CoveredColumnsIndexBuilder builder =
- (CoveredColumnsIndexBuilder) indexer.getBuilderForTesting();
+ NonTxIndexBuilder builder =
+ (NonTxIndexBuilder) indexer.getBuilderForTesting();
VerifyingIndexCodec codec = new VerifyingIndexCodec();
builder.setIndexCodecForTesting(codec);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
index dbe78e7..281ad63 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
@@ -1,19 +1,11 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
*/
package org.apache.phoenix.hbase.index.covered.example;
@@ -32,6 +24,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.util.Bytes;
@@ -49,105 +42,106 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-
/**
- * If {@link DoNotRetryIOException} is not subclassed correctly (with the {@link String}
- * constructor), {@link MultiResponse#readFields(java.io.DataInput)} will not correctly deserialize
- * the exception, and just return <tt>null</tt> to the client, which then just goes and retries.
+ * If {@link DoNotRetryIOException} is not subclassed correctly (with the {@link String} constructor),
+ * {@link MultiResponse#readFields(java.io.DataInput)} will not correctly deserialize the exception, and just return
+ * <tt>null</tt> to the client, which then just goes and retries.
*/
@Category(NeedsOwnMiniClusterTest.class)
public class FailWithoutRetriesIT {
- private static final Log LOG = LogFactory.getLog(FailWithoutRetriesIT.class);
- @Rule
- public TableName table = new TableName();
+ private static final Log LOG = LogFactory.getLog(FailWithoutRetriesIT.class);
+ @Rule
+ public TableName table = new TableName();
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private String getIndexTableName() {
+ return Bytes.toString(table.getTableName()) + "_index";
+ }
- private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ public static class FailingTestCodec extends BaseIndexCodec {
- private String getIndexTableName() {
- return Bytes.toString(table.getTableName()) + "_index";
- }
+ @Override
+ public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException {
+ throw new RuntimeException("Intentionally failing deletes for " + FailWithoutRetriesIT.class.getName());
+ }
- public static class FailingTestCodec extends BaseIndexCodec {
+ @Override
+ public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException {
+ throw new RuntimeException("Intentionally failing upserts for " + FailWithoutRetriesIT.class.getName());
+ }
+
+ @Override
+ public void setContext(TableState state, Mutation mutation) throws IOException {}
+
+ }
- @Override
- public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException {
- throw new RuntimeException("Intentionally failing deletes for "
- + FailWithoutRetriesIT.class.getName());
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ // setup and verify the config
+ Configuration conf = UTIL.getConfiguration();
+ setUpConfigForMiniCluster(conf);
+ IndexTestingUtils.setupConfig(conf);
+ IndexManagementUtil.ensureMutableIndexingCorrectlyConfigured(conf);
+ // start the cluster
+ UTIL.startMiniCluster();
}
- @Override
- public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException {
- throw new RuntimeException("Intentionally failing upserts for "
- + FailWithoutRetriesIT.class.getName());
+ @AfterClass
+ public static void teardownCluster() throws Exception {
+ UTIL.shutdownMiniCluster();
}
- }
-
- @BeforeClass
- public static void setupCluster() throws Exception {
- // setup and verify the config
- Configuration conf = UTIL.getConfiguration();
- setUpConfigForMiniCluster(conf);
- IndexTestingUtils.setupConfig(conf);
- IndexManagementUtil.ensureMutableIndexingCorrectlyConfigured(conf);
- // start the cluster
- UTIL.startMiniCluster();
- }
-
- @AfterClass
- public static void teardownCluster() throws Exception {
- UTIL.shutdownMiniCluster();
- }
-
- /**
- * If this test times out, then we didn't fail quickly enough. {@link Indexer} maybe isn't
- * rethrowing the exception correctly?
- * <p>
- * We use a custom codec to enforce the thrown exception.
- * @throws Exception
- */
- @Test(timeout = 300000)
- public void testQuickFailure() throws Exception {
- // incorrectly setup indexing for the primary table - target index table doesn't exist, which
- // should quickly return to the client
- byte[] family = Bytes.toBytes("family");
- ColumnGroup fam1 = new ColumnGroup(getIndexTableName());
- // values are [col1]
- fam1.add(new CoveredColumn(family, CoveredColumn.ALL_QUALIFIERS));
- CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder();
- // add the index family
- builder.addIndexGroup(fam1);
- // usually, we would create the index table here, but we don't for the sake of the test.
-
- // setup the primary table
- String primaryTable = Bytes.toString(table.getTableName());
- @SuppressWarnings("deprecation")
- HTableDescriptor pTable = new HTableDescriptor(primaryTable);
- pTable.addFamily(new HColumnDescriptor(family));
- // override the codec so we can use our test one
- builder.build(pTable, FailingTestCodec.class);
-
- // create the primary table
- HBaseAdmin admin = UTIL.getHBaseAdmin();
- admin.createTable(pTable);
- Configuration conf = new Configuration(UTIL.getConfiguration());
- // up the number of retries/wait time to make it obvious that we are failing with retries here
- conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 20);
- conf.setLong(HConstants.HBASE_CLIENT_PAUSE, 1000);
- HTable primary = new HTable(conf, primaryTable);
- primary.setAutoFlush(false, true);
-
- // do a simple put that should be indexed
- Put p = new Put(Bytes.toBytes("row"));
- p.add(family, null, Bytes.toBytes("value"));
- primary.put(p);
- try {
- primary.flushCommits();
- fail("Shouldn't have gotten a successful write to the primary table");
- } catch (RetriesExhaustedWithDetailsException e) {
- LOG.info("Correclty got a failure of the put!");
+ /**
+ * If this test times out, then we didn't fail quickly enough. {@link Indexer} maybe isn't rethrowing the exception
+ * correctly?
+ * <p>
+ * We use a custom codec to enforce the thrown exception.
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 300000)
+ public void testQuickFailure() throws Exception {
+ // incorrectly setup indexing for the primary table - target index table doesn't exist, which
+ // should quickly return to the client
+ byte[] family = Bytes.toBytes("family");
+ ColumnGroup fam1 = new ColumnGroup(getIndexTableName());
+ // values are [col1]
+ fam1.add(new CoveredColumn(family, CoveredColumn.ALL_QUALIFIERS));
+ CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder();
+ // add the index family
+ builder.addIndexGroup(fam1);
+ // usually, we would create the index table here, but we don't for the sake of the test.
+
+ // setup the primary table
+ String primaryTable = Bytes.toString(table.getTableName());
+ @SuppressWarnings("deprecation")
+ HTableDescriptor pTable = new HTableDescriptor(primaryTable);
+ pTable.addFamily(new HColumnDescriptor(family));
+ // override the codec so we can use our test one
+ builder.build(pTable, FailingTestCodec.class);
+
+ // create the primary table
+ HBaseAdmin admin = UTIL.getHBaseAdmin();
+ admin.createTable(pTable);
+ Configuration conf = new Configuration(UTIL.getConfiguration());
+ // up the number of retries/wait time to make it obvious that we are failing with retries here
+ conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 20);
+ conf.setLong(HConstants.HBASE_CLIENT_PAUSE, 1000);
+ HTable primary = new HTable(conf, primaryTable);
+ primary.setAutoFlush(false, true);
+
+ // do a simple put that should be indexed
+ Put p = new Put(Bytes.toBytes("row"));
+ p.add(family, null, Bytes.toBytes("value"));
+ primary.put(p);
+ try {
+ primary.flushCommits();
+ fail("Shouldn't have gotten a successful write to the primary table");
+ } catch (RetriesExhaustedWithDetailsException e) {
+ LOG.info("Correclty got a failure of the put!");
+ }
+ primary.close();
}
- primary.close();
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
index a6e36cb..bcadc2b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
@@ -19,8 +19,8 @@ package org.apache.phoenix.hbase.index;
import java.io.IOException;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
public interface ValueGetter {
@@ -32,7 +32,7 @@ public interface ValueGetter {
* present.
* @throws IOException if there is an error accessing the underlying data storage
*/
- public ImmutableBytesPtr getLatestValue(ColumnReference ref) throws IOException;
+ public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException;
public byte[] getRowKey();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index f9df296..dfb9ad4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -1,95 +1,127 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
*/
package org.apache.phoenix.hbase.index.builder;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.covered.IndexCodec;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
/**
* Basic implementation of the {@link IndexBuilder} that doesn't do any actual work of indexing.
* <p>
- * You should extend this class, rather than implementing IndexBuilder directly to maintain
- * compatability going forward.
+ * You should extend this class, rather than implementing IndexBuilder directly to maintain compatability going forward.
* <p>
- * Generally, you should consider using one of the implemented IndexBuilders (e.g
- * {@link CoveredColumnsIndexBuilder}) as there is a lot of work required to keep an index table
- * up-to-date.
+ * Generally, you should consider using one of the implemented IndexBuilders (e.g {@link NonTxIndexBuilder}) as there is
+ * a lot of work required to keep an index table up-to-date.
*/
public abstract class BaseIndexBuilder implements IndexBuilder {
+ public static final String CODEC_CLASS_NAME_KEY = "org.apache.hadoop.hbase.index.codec.class";
+ private static final Log LOG = LogFactory.getLog(BaseIndexBuilder.class);
- private static final Log LOG = LogFactory.getLog(BaseIndexBuilder.class);
- protected boolean stopped;
+ protected boolean stopped;
+ protected RegionCoprocessorEnvironment env;
+ protected IndexCodec codec;
- @Override
- public void extendBaseIndexBuilderInstead() { }
-
- @Override
- public void setup(RegionCoprocessorEnvironment conf) throws IOException {
- // noop
- }
+ abstract protected boolean useRawScanToPrimeBlockCache();
+
+ @Override
+ public void extendBaseIndexBuilderInstead() {}
- @Override
- public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
- // noop
- }
+ @Override
+ public void setup(RegionCoprocessorEnvironment env) throws IOException {
+ this.env = env;
+ // setup the phoenix codec. Generally, this will just be in standard one, but abstracting here
+ // so we can use it later when generalizing covered indexes
+ Configuration conf = env.getConfiguration();
+ Class<? extends IndexCodec> codecClass = conf.getClass(CODEC_CLASS_NAME_KEY, null, IndexCodec.class);
+ try {
+ Constructor<? extends IndexCodec> meth = codecClass.getDeclaredConstructor(new Class[0]);
+ meth.setAccessible(true);
+ this.codec = meth.newInstance();
+ this.codec.initialize(env);
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
- @Override
- public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp) {
- // noop
- }
-
- /**
- * By default, we always attempt to index the mutation. Commonly this can be slow (because the
- * framework spends the time to do the indexing, only to realize that you don't need it) or not
- * ideal (if you want to turn on/off indexing on a table without completely reloading it).
- * @throws IOException
- */
- @Override
- public boolean isEnabled(Mutation m) throws IOException {
- return true;
- }
+ @Override
+ public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ // noop
+ }
- /**
- * {@inheritDoc}
- * <p>
- * By default, assumes that all mutations should <b>not be batched</b>. That is to say, each
- * mutation always applies to different rows, even if they are in the same batch, or are
- * independent updates.
- */
- @Override
- public byte[] getBatchId(Mutation m) {
- return null;
- }
+ @Override
+ public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp) {
+ // noop
+ }
- @Override
- public void stop(String why) {
- LOG.debug("Stopping because: " + why);
- this.stopped = true;
- }
+ /**
+ * By default, we always attempt to index the mutation. Commonly this can be slow (because the framework spends the
+ * time to do the indexing, only to realize that you don't need it) or not ideal (if you want to turn on/off
+ * indexing on a table without completely reloading it).
+ *
+ * @throws IOException
+ */
+ @Override
+ public boolean isEnabled(Mutation m) throws IOException {
+ // ask the codec to see if we should even attempt indexing
+ return this.codec.isEnabled(m);
+ }
- @Override
- public boolean isStopped() {
- return this.stopped;
- }
+ /**
+ * Exposed for testing!
+ *
+ * @param codec
+ * codec to use for this instance of the builder
+ */
+ public void setIndexCodecForTesting(IndexCodec codec) {
+ this.codec = codec;
+ }
+
+ @Override
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * By default, assumes that all mutations should <b>not be batched</b>. That is to say, each mutation always applies
+ * to different rows, even if they are in the same batch, or are independent updates.
+ */
+ @Override
+ public byte[] getBatchId(Mutation m) {
+ return this.codec.getBatchId(m);
+ }
+
+ @Override
+ public void stop(String why) {
+ LOG.debug("Stopping because: " + why);
+ this.stopped = true;
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stopped;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index ba9534c..d5fd34d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -29,7 +29,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
@@ -156,19 +155,6 @@ public class IndexBuildManager implements Stoppable {
return results;
}
- public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete delete) throws IOException {
- // all we get is a single update, so it would probably just go slower if we needed to queue it
- // up. It will increase underlying resource contention a little bit, but the mutation case is
- // far more common, so let's not worry about it for now.
- // short circuit so we don't waste time.
- if (!this.delegate.isEnabled(delete)) {
- return null;
- }
-
- return delegate.getIndexUpdate(delete);
-
- }
-
public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
Collection<KeyValue> filtered) throws IOException {
// this is run async, so we can take our time here
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index b91a52a..194fdcc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -24,7 +24,6 @@ import java.util.Map;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -79,32 +78,26 @@ public interface IndexBuilder extends Stoppable {
public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException;
- /**
- * The counter-part to {@link #getIndexUpdate(Mutation)} - your opportunity to update any/all
- * index tables based on the delete of the primary table row. This is only called for cases where
- * the client sends a single delete ({@link HTable#delete}). We separate this method from
- * {@link #getIndexUpdate(Mutation)} only for the ease of implementation as the delete path has
- * subtly different semantics for updating the families/timestamps from the generic batch path.
- * <p>
- * Its up to your implementation to ensure that timestamps match between the primary and index
- * tables.
- * <p>
- * Implementers must ensure that this method is thread-safe - it could (and probably will) be
- * called concurrently for different mutations, which may or may not be part of the same batch.
- * @param delete {@link Delete} to the primary table that may be indexed
- * @return a {@link Map} of the mutations to make -> target index table name
- * @throws IOException on failure
- */
- public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete delete) throws IOException;
-
- /**
- * Build an index update to cleanup the index when we remove {@link KeyValue}s via the normal
- * flush or compaction mechanisms.
- * @param filtered {@link KeyValue}s that previously existed, but won't be included in further
- * output from HBase.
- * @return a {@link Map} of the mutations to make -> target index table name
- * @throws IOException on failure
- */
+ /**
+ * Build an index update to cleanup the index when we remove {@link KeyValue}s via the normal flush or compaction
+ * mechanisms. Currently not implemented by any implementors nor called, but left here to be implemented if we
+ * ever need it. In Jesse's words:
+ *
+ * Arguably, this is a correctness piece that should be used, but isn't. Basically, it *could* be that
+ * if a compaction/flush were to remove a key (too old, too many versions) you might want to cleanup the index table
+ * as well, if it were to get out of sync with the primary table. For instance, you might get multiple versions of
+ * the same row, which should eventually age of the oldest version. However, in the index table there would only
+ * ever be two entries for that row - the first one, matching the original row, and the delete marker for the index
+ * update, set when we got a newer version of the primary row. So, a basic HBase scan wouldn't show the index update
+ * b/c its covered by the delete marker, but an older timestamp based read would actually show the index row, even
+ * after the primary table row is gone due to MAX_VERSIONS requirement.
+ *
+ * @param filtered {@link KeyValue}s that previously existed, but won't be included
+ * in further output from HBase.
+ *
+ * @return a {@link Map} of the mutations to make -> target index table name
+ * @throws IOException on failure
+ */
public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
Collection<KeyValue> filtered)
throws IOException;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java
index e707ea2..5e0da3c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.hbase.index.covered;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
/**
@@ -27,9 +28,9 @@ import org.apache.hadoop.hbase.KeyValue;
*/
public class Batch {
- private static final long pointDeleteCode = KeyValue.Type.Delete.getCode();
+ private static final byte pointDeleteCode = KeyValue.Type.Delete.getCode();
private final long timestamp;
- private List<KeyValue> batch = new ArrayList<KeyValue>();
+ private List<Cell> batch = new ArrayList<Cell>();
private boolean allPointDeletes = true;
/**
@@ -39,8 +40,8 @@ public class Batch {
this.timestamp = ts;
}
- public void add(KeyValue kv){
- if (pointDeleteCode != kv.getType()) {
+ public void add(Cell kv){
+ if (pointDeleteCode != kv.getTypeByte()) {
allPointDeletes = false;
}
batch.add(kv);
@@ -54,7 +55,7 @@ public class Batch {
return this.timestamp;
}
- public List<KeyValue> getKvs() {
+ public List<Cell> getKvs() {
return this.batch;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsIndexBuilder.java
deleted file mode 100644
index 6524fd4..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsIndexBuilder.java
+++ /dev/null
@@ -1,491 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hbase.index.covered;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
-
-import com.google.common.collect.Lists;
-import com.google.common.primitives.Longs;
-import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder;
-import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
-import org.apache.phoenix.hbase.index.covered.data.LocalTable;
-import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
-import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
-import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
-
-/**
- * Build covered indexes for phoenix updates.
- * <p>
- * Before any call to prePut/preDelete, the row has already been locked. This ensures that we don't
- * need to do any extra synchronization in the IndexBuilder.
- * <p>
- * NOTE: This implementation doesn't cleanup the index when we remove a key-value on compaction or
- * flush, leading to a bloated index that needs to be cleaned up by a background process.
- */
-public class CoveredColumnsIndexBuilder extends BaseIndexBuilder {
-
- private static final Log LOG = LogFactory.getLog(CoveredColumnsIndexBuilder.class);
- public static final String CODEC_CLASS_NAME_KEY = "org.apache.hadoop.hbase.index.codec.class";
-
- protected RegionCoprocessorEnvironment env;
- protected IndexCodec codec;
- protected LocalHBaseState localTable;
-
- @Override
- public void setup(RegionCoprocessorEnvironment env) throws IOException {
- this.env = env;
- // setup the phoenix codec. Generally, this will just be in standard one, but abstracting here
- // so we can use it later when generalizing covered indexes
- Configuration conf = env.getConfiguration();
- Class<? extends IndexCodec> codecClass =
- conf.getClass(CODEC_CLASS_NAME_KEY, null, IndexCodec.class);
- try {
- Constructor<? extends IndexCodec> meth = codecClass.getDeclaredConstructor(new Class[0]);
- meth.setAccessible(true);
- this.codec = meth.newInstance();
- this.codec.initialize(env);
- } catch (IOException e) {
- throw e;
- } catch (Exception e) {
- throw new IOException(e);
- }
-
- this.localTable = new LocalTable(env);
- }
-
- @Override
- public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException {
- // build the index updates for each group
- IndexUpdateManager updateMap = new IndexUpdateManager();
-
- batchMutationAndAddUpdates(updateMap, mutation);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Found index updates for Mutation: " + mutation + "\n" + updateMap);
- }
-
- return updateMap.toMap();
- }
-
- /**
- * Split the mutation into batches based on the timestamps of each keyvalue. We need to check each
- * key-value in the update to see if it matches the others. Generally, this will be the case, but
- * you can add kvs to a mutation that don't all have the timestamp, so we need to manage
- * everything in batches based on timestamp.
- * <p>
- * Adds all the updates in the {@link Mutation} to the state, as a side-effect.
- * @param updateMap index updates into which to add new updates. Modified as a side-effect.
- * @param state current state of the row for the mutation.
- * @param m mutation to batch
- * @throws IOException
- */
- private void batchMutationAndAddUpdates(IndexUpdateManager manager, Mutation m) throws IOException {
- // split the mutation into timestamp-based batches
- Collection<Batch> batches = createTimestampBatchesFromMutation(m);
-
- // create a state manager, so we can manage each batch
- LocalTableState state = new LocalTableState(env, localTable, m);
-
- // go through each batch of keyvalues and build separate index entries for each
- boolean cleanupCurrentState = true;
- for (Batch batch : batches) {
- /*
- * We have to split the work between the cleanup and the update for each group because when we
- * update the current state of the row for the current batch (appending the mutations for the
- * current batch) the next group will see that as the current state, which will can cause the
- * a delete and a put to be created for the next group.
- */
- if (addMutationsForBatch(manager, batch, state, cleanupCurrentState)) {
- cleanupCurrentState = false;
- }
- }
- }
-
- /**
- * Batch all the {@link KeyValue}s in a {@link Mutation} by timestamp. Updates any
- * {@link KeyValue} with a timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at
- * the time the method is called.
- * @param m {@link Mutation} from which to extract the {@link KeyValue}s
- * @return the mutation, broken into batches and sorted in ascending order (smallest first)
- */
- protected Collection<Batch> createTimestampBatchesFromMutation(Mutation m) {
- Map<Long, Batch> batches = new HashMap<Long, Batch>();
- for (List<Cell> family : m.getFamilyCellMap().values()) {
- List<KeyValue> familyKVs = KeyValueUtil.ensureKeyValues(family);
- createTimestampBatchesFromKeyValues(familyKVs, batches);
- }
- // sort the batches
- List<Batch> sorted = new ArrayList<Batch>(batches.values());
- Collections.sort(sorted, new Comparator<Batch>() {
- @Override
- public int compare(Batch o1, Batch o2) {
- return Longs.compare(o1.getTimestamp(), o2.getTimestamp());
- }
- });
- return sorted;
- }
-
- /**
- * Batch all the {@link KeyValue}s in a collection of kvs by timestamp. Updates any
- * {@link KeyValue} with a timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at
- * the time the method is called.
- * @param kvs {@link KeyValue}s to break into batches
- * @param batches to update with the given kvs
- */
- protected void createTimestampBatchesFromKeyValues(Collection<KeyValue> kvs,
- Map<Long, Batch> batches) {
- long now = EnvironmentEdgeManager.currentTimeMillis();
- byte[] nowBytes = Bytes.toBytes(now);
-
- // batch kvs by timestamp
- for (KeyValue kv : kvs) {
- long ts = kv.getTimestamp();
- // override the timestamp to the current time, so the index and primary tables match
- // all the keys with LATEST_TIMESTAMP will then be put into the same batch
- if (kv.updateLatestStamp(nowBytes)) {
- ts = now;
- }
- Batch batch = batches.get(ts);
- if (batch == null) {
- batch = new Batch(ts);
- batches.put(ts, batch);
- }
- batch.add(kv);
- }
- }
-
- /**
- * For a single batch, get all the index updates and add them to the updateMap
- * <p>
- * This method manages cleaning up the entire history of the row from the given timestamp forward
- * for out-of-order (e.g. 'back in time') updates.
- * <p>
- * If things arrive out of order (client is using custom timestamps) we should still see the index
- * in the correct order (assuming we scan after the out-of-order update in finished). Therefore,
- * we when we aren't the most recent update to the index, we need to delete the state at the
- * current timestamp (similar to above), but also issue a delete for the added index updates at
- * the next newest timestamp of any of the columns in the update; we need to cleanup the insert so
- * it looks like it was also deleted at that next newest timestamp. However, its not enough to
- * just update the one in front of us - that column will likely be applied to index entries up the
- * entire history in front of us, which also needs to be fixed up.
- * <p>
- * However, the current update usually will be the most recent thing to be added. In that case,
- * all we need to is issue a delete for the previous index row (the state of the row, without the
- * update applied) at the current timestamp. This gets rid of anything currently in the index for
- * the current state of the row (at the timestamp). Then we can just follow that by applying the
- * pending update and building the index update based on the new row state.
- * @param updateMap map to update with new index elements
- * @param batch timestamp-based batch of edits
- * @param state local state to update and pass to the codec
- * @param requireCurrentStateCleanup <tt>true</tt> if we should should attempt to cleanup the
- * current state of the table, in the event of a 'back in time' batch. <tt>false</tt>
- * indicates we should not attempt the cleanup, e.g. an earlier batch already did the
- * cleanup.
- * @return <tt>true</tt> if we cleaned up the current state forward (had a back-in-time put),
- * <tt>false</tt> otherwise
- * @throws IOException
- */
- private boolean addMutationsForBatch(IndexUpdateManager updateMap, Batch batch,
- LocalTableState state, boolean requireCurrentStateCleanup) throws IOException {
-
- // need a temporary manager for the current batch. It should resolve any conflicts for the
- // current batch. Essentially, we can get the case where a batch doesn't change the current
- // state of the index (all Puts are covered by deletes), in which case we don't want to add
- // anything
- // A. Get the correct values for the pending state in the batch
- // A.1 start by cleaning up the current state - as long as there are key-values in the batch
- // that are indexed, we need to change the current state of the index. Its up to the codec to
- // determine if we need to make any cleanup given the pending update.
- long batchTs = batch.getTimestamp();
- state.setPendingUpdates(batch.getKvs());
- addCleanupForCurrentBatch(updateMap, batchTs, state);
-
- // A.2 do a single pass first for the updates to the current state
- state.applyPendingUpdates();
- long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap);
- // if all the updates are the latest thing in the index, we are done - don't go and fix history
- if (ColumnTracker.isNewestTime(minTs)) {
- return false;
- }
-
- // A.3 otherwise, we need to roll up through the current state and get the 'correct' view of the
- // index. after this, we have the correct view of the index, from the batch up to the index
- while(!ColumnTracker.isNewestTime(minTs) ){
- minTs = addUpdateForGivenTimestamp(minTs, state, updateMap);
- }
-
- // B. only cleanup the current state if we need to - its a huge waste of effort otherwise.
- if (requireCurrentStateCleanup) {
- // roll back the pending update. This is needed so we can remove all the 'old' index entries.
- // We don't need to do the puts here, but just the deletes at the given timestamps since we
- // just want to completely hide the incorrect entries.
- state.rollback(batch.getKvs());
- // setup state
- state.setPendingUpdates(batch.getKvs());
-
- // cleanup the pending batch. If anything in the correct history is covered by Deletes used to
- // 'fix' history (same row key and ts), we just drop the delete (we don't want to drop both
- // because the update may have a different set of columns or value based on the update).
- cleanupIndexStateFromBatchOnward(updateMap, batchTs, state);
-
- // have to roll the state forward again, so the current state is correct
- state.applyPendingUpdates();
- return true;
- }
- return false;
- }
-
- private long addUpdateForGivenTimestamp(long ts, LocalTableState state,
- IndexUpdateManager updateMap) throws IOException {
- state.setCurrentTimestamp(ts);
- ts = addCurrentStateMutationsForBatch(updateMap, state);
- return ts;
- }
-
- private void addCleanupForCurrentBatch(IndexUpdateManager updateMap, long batchTs,
- LocalTableState state) throws IOException {
- // get the cleanup for the current state
- state.setCurrentTimestamp(batchTs);
- addDeleteUpdatesToMap(updateMap, state, batchTs);
- // ignore any index tracking from the delete
- state.resetTrackedColumns();
- }
-
- /**
- * Add the necessary mutations for the pending batch on the local state. Handles rolling up
- * through history to determine the index changes after applying the batch (for the case where the
- * batch is back in time).
- * @param updateMap to update with index mutations
- * @param batch to apply to the current state
- * @param state current state of the table
- * @return the minimum timestamp across all index columns requested. If
- * {@link ColumnTracker#isNewestTime(long)} returns <tt>true</tt> on the returned
- * timestamp, we know that this <i>was not a back-in-time update</i>.
- * @throws IOException
- */
- private long
- addCurrentStateMutationsForBatch(IndexUpdateManager updateMap, LocalTableState state) throws IOException {
-
- // get the index updates for this current batch
- Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state);
- state.resetTrackedColumns();
-
- /*
- * go through all the pending updates. If we are sure that all the entries are the latest
- * timestamp, we can just add the index updates and move on. However, if there are columns that
- * we skip past (based on the timestamp of the batch), we need to roll back up the history.
- * Regardless of whether or not they are the latest timestamp, the entries here are going to be
- * correct for the current batch timestamp, so we add them to the updates. The only thing we
- * really care about it if we need to roll up the history and fix it as we go.
- */
- // timestamp of the next update we need to track
- long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
- List<IndexedColumnGroup> columnHints = new ArrayList<IndexedColumnGroup>();
- for (IndexUpdate update : upserts) {
- // this is the one bit where we check the timestamps
- final ColumnTracker tracker = update.getIndexedColumns();
- long trackerTs = tracker.getTS();
- // update the next min TS we need to track
- if (trackerTs < minTs) {
- minTs = tracker.getTS();
- }
- // track index hints for the next round. Hint if we need an update for that column for the
- // next timestamp. These columns clearly won't need to update as we go through time as they
- // already match the most recent possible thing.
- boolean needsCleanup = false;
- if (tracker.hasNewerTimestamps()) {
- columnHints.add(tracker);
- // this update also needs to be cleaned up at the next timestamp because it not the latest.
- needsCleanup = true;
- }
-
-
- // only make the put if the index update has been setup
- if (update.isValid()) {
- byte[] table = update.getTableName();
- Mutation mutation = update.getUpdate();
- updateMap.addIndexUpdate(table, mutation);
-
- // only make the cleanup if we made a put and need cleanup
- if (needsCleanup) {
- // there is a TS for the interested columns that is greater than the columns in the
- // put. Therefore, we need to issue a delete at the same timestamp
- Delete d = new Delete(mutation.getRow());
- d.setTimestamp(tracker.getTS());
- updateMap.addIndexUpdate(table, d);
- }
- }
- }
- return minTs;
- }
-
- /**
- * Cleanup the index based on the current state from the given batch. Iterates over each timestamp
- * (for the indexed rows) for the current state of the table and cleans up all the existing
- * entries generated by the codec.
- * <p>
- * Adds all pending updates to the updateMap
- * @param updateMap updated with the pending index updates from the codec
- * @param batchTs timestamp from which we should cleanup
- * @param state current state of the primary table. Should already by setup to the correct state
- * from which we want to cleanup.
- * @throws IOException
- */
- private void cleanupIndexStateFromBatchOnward(IndexUpdateManager updateMap,
- long batchTs, LocalTableState state) throws IOException {
- // get the cleanup for the current state
- state.setCurrentTimestamp(batchTs);
- addDeleteUpdatesToMap(updateMap, state, batchTs);
- Set<ColumnTracker> trackers = state.getTrackedColumns();
- long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
- for (ColumnTracker tracker : trackers) {
- if (tracker.getTS() < minTs) {
- minTs = tracker.getTS();
- }
- }
- state.resetTrackedColumns();
- if (!ColumnTracker.isNewestTime(minTs)) {
- state.setHints(Lists.newArrayList(trackers));
- cleanupIndexStateFromBatchOnward(updateMap, minTs, state);
- }
- }
-
-
- /**
- * Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState)} and then
- * add them to the update map.
- * <p>
- * Expects the {@link LocalTableState} to already be correctly setup (correct timestamp, updates
- * applied, etc).
- * @throws IOException
- */
- protected void
- addDeleteUpdatesToMap(IndexUpdateManager updateMap,
- LocalTableState state, long ts) throws IOException {
- Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state);
- if (cleanup != null) {
- for (IndexUpdate d : cleanup) {
- if (!d.isValid()) {
- continue;
- }
- // override the timestamps in the delete to match the current batch.
- Delete remove = (Delete)d.getUpdate();
- remove.setTimestamp(ts);
- updateMap.addIndexUpdate(d.getTableName(), remove);
- }
- }
- }
-
- @Override
- public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete d) throws IOException {
- // stores all the return values
- IndexUpdateManager updateMap = new IndexUpdateManager();
-
- // We have to figure out which kind of delete it is, since we need to do different things if its
- // a general (row) delete, versus a delete of just a single column or family
- Map<byte[], List<Cell>> families = d.getFamilyCellMap();
-
- /*
- * Option 1: its a row delete marker, so we just need to delete the most recent state for each
- * group, as of the specified timestamp in the delete. This can happen if we have a single row
- * update and it is part of a batch mutation (prepare doesn't happen until later... maybe a
- * bug?). In a single delete, this delete gets all the column families appended, so the family
- * map won't be empty by the time it gets here.
- */
- if (families.size() == 0) {
- LocalTableState state = new LocalTableState(env, localTable, d);
- // get a consistent view of name
- long now = d.getTimeStamp();
- if (now == HConstants.LATEST_TIMESTAMP) {
- now = EnvironmentEdgeManager.currentTimeMillis();
- // update the delete's idea of 'now' to be consistent with the index
- d.setTimestamp(now);
- }
- // get deletes from the codec
- // we only need to get deletes and not add puts because this delete covers all columns
- addDeleteUpdatesToMap(updateMap, state, now);
-
- /*
- * Update the current state for all the kvs in the delete. Generally, we would just iterate
- * the family map, but since we go here, the family map is empty! Therefore, we need to fake a
- * bunch of family deletes (just like hos HRegion#prepareDelete works). This is just needed
- * for current version of HBase that has an issue where the batch update doesn't update the
- * deletes before calling the hook.
- */
- byte[] deleteRow = d.getRow();
- for (byte[] family : this.env.getRegion().getTableDesc().getFamiliesKeys()) {
- state.addPendingUpdates(new KeyValue(deleteRow, family, null, now,
- KeyValue.Type.DeleteFamily));
- }
- } else {
- // Option 2: Its actually a bunch single updates, which can have different timestamps.
- // Therefore, we need to do something similar to the put case and batch by timestamp
- batchMutationAndAddUpdates(updateMap, d);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Found index updates for Delete: " + d + "\n" + updateMap);
- }
-
- return updateMap.toMap();
- }
-
- @Override
- public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
- Collection<KeyValue> filtered) throws IOException {
- // TODO Implement IndexBuilder.getIndexUpdateForFilteredRows
- return null;
- }
-
- /**
- * Exposed for testing!
- * @param codec codec to use for this instance of the builder
- */
- public void setIndexCodecForTesting(IndexCodec codec) {
- this.codec = codec;
- }
-
- @Override
- public boolean isEnabled(Mutation m) throws IOException {
- // ask the codec to see if we should even attempt indexing
- return this.codec.isEnabled(m);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
index daa631b..e3ef831 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
@@ -1,19 +1,11 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
*/
package org.apache.phoenix.hbase.index.covered;
@@ -22,89 +14,93 @@ import java.io.IOException;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-
import org.apache.phoenix.index.BaseIndexCodec;
-
/**
* Codec for creating index updates from the current state of a table.
* <p>
- * Generally, you should extend {@link BaseIndexCodec} instead, so help maintain compatibility as
- * features need to be added to the codec, as well as potentially not haivng to implement some
- * methods.
+ * Generally, you should extend {@link BaseIndexCodec} instead, so help maintain compatibility as features need to be
+ * added to the codec, as well as potentially not haivng to implement some methods.
*/
public interface IndexCodec {
- /**
- * Do any code initialization necessary
- * @param env environment in which the codec is operating
- * @throws IOException if the codec cannot be initalized correctly
- */
- public void initialize(RegionCoprocessorEnvironment env) throws IOException;
+ /**
+ * Do any code initialization necessary
+ *
+ * @param env
+ * environment in which the codec is operating
+ * @throws IOException
+ * if the codec cannot be initalized correctly
+ */
+ public void initialize(RegionCoprocessorEnvironment env) throws IOException;
+
+ /**
+ * Get the index cleanup entries. Currently, this must return just single row deletes (where just the row-key is
+ * specified and no columns are returned) mapped to the table name. For instance, to you have an index 'myIndex'
+ * with row :
+ *
+ * <pre>
+ * v1,v2,v3 | CF:CQ0 | rowkey
+ * | CF:CQ1 | rowkey
+ * </pre>
+ *
+ * To then cleanup this entry, you would just return 'v1,v2,v3', 'myIndex'.
+ *
+ * @param state
+ * the current state of the table that needs to be cleaned up. Generally, you only care about the latest
+ * column values, for each column you are indexing for each index table.
+ * @return the pairs of (deletes, index table name) that should be applied.
+ * @throws IOException
+ */
+ public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException;
- /**
- * Get the index cleanup entries. Currently, this must return just single row deletes (where just
- * the row-key is specified and no columns are returned) mapped to the table name. For instance,
- * to you have an index 'myIndex' with row :
- *
- * <pre>
- * v1,v2,v3 | CF:CQ0 | rowkey
- * | CF:CQ1 | rowkey
- * </pre>
- *
- * To then cleanup this entry, you would just return 'v1,v2,v3', 'myIndex'.
- * @param state the current state of the table that needs to be cleaned up. Generally, you only
- * care about the latest column values, for each column you are indexing for each index
- * table.
- * @return the pairs of (deletes, index table name) that should be applied.
- * @throws IOException
- */
- public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException;
+ // table state has the pending update already applied, before calling
+ // get the new index entries
+ /**
+ * Get the index updates for the primary table state, for each index table. The returned {@link Put}s need to be
+ * fully specified (including timestamp) to minimize passes over the same key-values multiple times.
+ * <p>
+ * You must specify the same timestamps on the Put as {@link TableState#getCurrentTimestamp()} so the index entries
+ * match the primary table row. This could be managed at a higher level, but would require iterating all the kvs in
+ * the Put again - very inefficient when compared to the current interface where you must provide a timestamp
+ * anyways (so you might as well provide the right one).
+ *
+ * @param state
+ * the current state of the table that needs to an index update Generally, you only care about the latest
+ * column values, for each column you are indexing for each index table.
+ * @return the pairs of (updates,index table name) that should be applied.
+ * @throws IOException
+ */
+ public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException;
- // table state has the pending update already applied, before calling
- // get the new index entries
- /**
- * Get the index updates for the primary table state, for each index table. The returned
- * {@link Put}s need to be fully specified (including timestamp) to minimize passes over the same
- * key-values multiple times.
- * <p>
- * You must specify the same timestamps on the Put as {@link TableState#getCurrentTimestamp()} so
- * the index entries match the primary table row. This could be managed at a higher level, but
- * would require iterating all the kvs in the Put again - very inefficient when compared to the
- * current interface where you must provide a timestamp anyways (so you might as well provide the
- * right one).
- * @param state the current state of the table that needs to an index update Generally, you only
- * care about the latest column values, for each column you are indexing for each index
- * table.
- * @return the pairs of (updates,index table name) that should be applied.
- * @throws IOException
- */
- public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException;
+ /**
+ * This allows the codec to dynamically change whether or not indexing should take place for a table. If it doesn't
+ * take place, we can save a lot of time on the regular Put patch. By making it dynamic, we can save offlining and
+ * then onlining a table just to turn indexing on.
+ * <p>
+ * We can also be smart about even indexing a given update here too - if the update doesn't contain any columns that
+ * we care about indexing, we can save the effort of analyzing the put and further.
+ *
+ * @param m
+ * mutation that should be indexed.
+ * @return <tt>true</tt> if indexing is enabled for the given table. This should be on a per-table basis, as each
+ * codec is instantiated per-region.
+ * @throws IOException
+ */
+ public boolean isEnabled(Mutation m) throws IOException;
- /**
- * This allows the codec to dynamically change whether or not indexing should take place for a
- * table. If it doesn't take place, we can save a lot of time on the regular Put patch. By making
- * it dynamic, we can save offlining and then onlining a table just to turn indexing on.
- * <p>
- * We can also be smart about even indexing a given update here too - if the update doesn't
- * contain any columns that we care about indexing, we can save the effort of analyzing the put
- * and further.
- * @param m mutation that should be indexed.
- * @return <tt>true</tt> if indexing is enabled for the given table. This should be on a per-table
- * basis, as each codec is instantiated per-region.
- * @throws IOException
- */
- public boolean isEnabled(Mutation m) throws IOException;
+ /**
+ * Get the batch identifier of the given mutation. Generally, updates to the table will take place in a batch of
+ * updates; if we know that the mutation is part of a batch, we can build the state much more intelligently.
+ * <p>
+ * <b>If you have batches that have multiple updates to the same row state, you must specify a batch id for each
+ * batch. Otherwise, we cannot guarantee index correctness</b>
+ *
+ * @param m
+ * mutation that may or may not be part of the batch
+ * @return <tt>null</tt> if the mutation is not part of a batch or an id for the batch.
+ */
+ public byte[] getBatchId(Mutation m);
- /**
- * Get the batch identifier of the given mutation. Generally, updates to the table will take place
- * in a batch of updates; if we know that the mutation is part of a batch, we can build the state
- * much more intelligently.
- * <p>
- * <b>If you have batches that have multiple updates to the same row state, you must specify a
- * batch id for each batch. Otherwise, we cannot guarantee index correctness</b>
- * @param m mutation that may or may not be part of the batch
- * @return <tt>null</tt> if the mutation is not part of a batch or an id for the batch.
- */
- public byte[] getBatchId(Mutation m);
+ public void setContext(TableState state, Mutation mutation) throws IOException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index e4bc193..f47a71a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -1,19 +1,11 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
*/
package org.apache.phoenix.hbase.index.covered;
@@ -34,7 +26,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.util.Pair;
-
+import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.data.IndexMemStore;
import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
@@ -42,203 +34,250 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
import org.apache.phoenix.hbase.index.scanner.Scanner;
import org.apache.phoenix.hbase.index.scanner.ScannerBuilder;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+
+import com.google.common.collect.Maps;
/**
* Manage the state of the HRegion's view of the table, for the single row.
* <p>
- * Currently, this is a single-use object - you need to create a new one for each row that you need
- * to manage. In the future, we could make this object reusable, but for the moment its easier to
- * manage as a throw-away object.
+ * Currently, this is a single-use object - you need to create a new one for each row that you need to manage. In the
+ * future, we could make this object reusable, but for the moment its easier to manage as a throw-away object.
* <p>
- * This class is <b>not</b> thread-safe - it requires external synchronization is access
- * concurrently.
+ * This class is <b>not</b> thread-safe - it requires external synchronization is access concurrently.
*/
public class LocalTableState implements TableState {
- private long ts;
- private RegionCoprocessorEnvironment env;
- private KeyValueStore memstore;
- private LocalHBaseState table;
- private Mutation update;
- private Set<ColumnTracker> trackedColumns = new HashSet<ColumnTracker>();
- private ScannerBuilder scannerBuilder;
- private List<KeyValue> kvs = new ArrayList<KeyValue>();
- private List<? extends IndexedColumnGroup> hints;
- private CoveredColumns columnSet;
-
- public LocalTableState(RegionCoprocessorEnvironment environment, LocalHBaseState table, Mutation update) {
- this.env = environment;
- this.table = table;
- this.update = update;
- this.memstore = new IndexMemStore();
- this.scannerBuilder = new ScannerBuilder(memstore, update);
- this.columnSet = new CoveredColumns();
- }
-
- public void addPendingUpdates(KeyValue... kvs) {
- if (kvs == null) return;
- addPendingUpdates(Arrays.asList(kvs));
- }
-
- public void addPendingUpdates(List<KeyValue> kvs) {
- if(kvs == null) return;
- setPendingUpdates(kvs);
- addUpdate(kvs);
- }
-
- private void addUpdate(List<KeyValue> list) {
- addUpdate(list, true);
- }
-
- private void addUpdate(List<KeyValue> list, boolean overwrite) {
- if (list == null) return;
- for (KeyValue kv : list) {
- this.memstore.add(kv, overwrite);
- }
- }
-
- @Override
- public RegionCoprocessorEnvironment getEnvironment() {
- return this.env;
- }
-
- @Override
- public long getCurrentTimestamp() {
- return this.ts;
- }
-
- @Override
- public void setCurrentTimestamp(long timestamp) {
- this.ts = timestamp;
- }
-
- public void resetTrackedColumns() {
- this.trackedColumns.clear();
- }
-
- public Set<ColumnTracker> getTrackedColumns() {
- return this.trackedColumns;
- }
-
- @Override
- public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState(
- Collection<? extends ColumnReference> indexedColumns) throws IOException {
- ensureLocalStateInitialized(indexedColumns);
- // filter out things with a newer timestamp and track the column references to which it applies
- ColumnTracker tracker = new ColumnTracker(indexedColumns);
- synchronized (this.trackedColumns) {
- // we haven't seen this set of columns before, so we need to create a new tracker
- if (!this.trackedColumns.contains(tracker)) {
- this.trackedColumns.add(tracker);
- }
- }
-
- Scanner scanner =
- this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts);
-
- return new Pair<Scanner, IndexUpdate>(scanner, new IndexUpdate(tracker));
- }
-
- /**
- * Initialize the managed local state. Generally, this will only be called by
- * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be called concurrently from the outside.
- * Even then, there is still fairly low contention as each new Put/Delete will have its own table
- * state.
- */
- private synchronized void ensureLocalStateInitialized(
- Collection<? extends ColumnReference> columns) throws IOException {
- // check to see if we haven't initialized any columns yet
- Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(columns);
- // we have all the columns loaded, so we are good to go.
- if (toCover.isEmpty()) {
- return;
- }
-
- // add the current state of the row
- this.addUpdate(this.table.getCurrentRowState(update, toCover).list(), false);
-
- // add the covered columns to the set
- for (ColumnReference ref : toCover) {
- this.columnSet.addColumn(ref);
- }
- }
-
- @Override
- public Map<String, byte[]> getUpdateAttributes() {
- return this.update.getAttributesMap();
- }
-
- @Override
- public byte[] getCurrentRowKey() {
- return this.update.getRow();
- }
-
- public Result getCurrentRowState() {
- KeyValueScanner scanner = this.memstore.getScanner();
- List<Cell> kvs = new ArrayList<Cell>();
- while (scanner.peek() != null) {
- try {
- kvs.add(scanner.next());
- } catch (IOException e) {
- // this should never happen - something has gone terribly arwy if it has
- throw new RuntimeException("Local MemStore threw IOException!");
- }
- }
- return Result.create(kvs);
- }
-
- /**
- * Helper to add a {@link Mutation} to the values stored for the current row
- * @param pendingUpdate update to apply
- */
- public void addUpdateForTesting(Mutation pendingUpdate) {
- for (Map.Entry<byte[], List<Cell>> e : pendingUpdate.getFamilyCellMap().entrySet()) {
- List<KeyValue> edits = KeyValueUtil.ensureKeyValues(e.getValue());
- addUpdate(edits);
- }
- }
-
- /**
- * @param hints
- */
- public void setHints(List<? extends IndexedColumnGroup> hints) {
- this.hints = hints;
- }
-
- @Override
- public List<? extends IndexedColumnGroup> getIndexColumnHints() {
- return this.hints;
- }
-
- @Override
- public Collection<KeyValue> getPendingUpdate() {
- return this.kvs;
- }
-
- /**
- * Set the {@link KeyValue}s in the update for which we are currently building an index update,
- * but don't actually apply them.
- * @param update pending {@link KeyValue}s
- */
- public void setPendingUpdates(Collection<KeyValue> update) {
- this.kvs.clear();
- this.kvs.addAll(update);
- }
-
- /**
- * Apply the {@link KeyValue}s set in {@link #setPendingUpdates(Collection)}.
- */
- public void applyPendingUpdates() {
- this.addUpdate(kvs);
- }
-
- /**
- * Rollback all the given values from the underlying state.
- * @param values
- */
- public void rollback(Collection<KeyValue> values) {
- for (KeyValue kv : values) {
- this.memstore.rollback(kv);
- }
- }
+ private long ts;
+ private RegionCoprocessorEnvironment env;
+ private KeyValueStore memstore;
+ private LocalHBaseState table;
+ private Mutation update;
+ private Set<ColumnTracker> trackedColumns = new HashSet<ColumnTracker>();
+ private ScannerBuilder scannerBuilder;
+ private List<Cell> kvs = new ArrayList<Cell>();
+ private List<? extends IndexedColumnGroup> hints;
+ private CoveredColumns columnSet;
+ private final Map<String,Object> context = Maps.newHashMap();
+
+ public LocalTableState(RegionCoprocessorEnvironment environment, LocalHBaseState table, Mutation update) {
+ this.env = environment;
+ this.table = table;
+ this.update = update;
+ this.memstore = new IndexMemStore();
+ this.scannerBuilder = new ScannerBuilder(memstore, update);
+ this.columnSet = new CoveredColumns();
+ }
+
+ public void addPendingUpdates(Cell... kvs) {
+ if (kvs == null) return;
+ addPendingUpdates(Arrays.asList(kvs));
+ }
+
+ public void addPendingUpdates(List<Cell> kvs) {
+ if (kvs == null) return;
+ setPendingUpdates(kvs);
+ addUpdate(kvs);
+ }
+
+ private void addUpdate(List<Cell> list) {
+ addUpdate(list, true);
+ }
+
+ private void addUpdate(List<Cell> list, boolean overwrite) {
+ if (list == null) return;
+ for (Cell kv : list) {
+ this.memstore.add(KeyValueUtil.ensureKeyValue(kv), overwrite);
+ }
+ }
+
+ @Override
+ public RegionCoprocessorEnvironment getEnvironment() {
+ return this.env;
+ }
+
+ @Override
+ public long getCurrentTimestamp() {
+ return this.ts;
+ }
+
+ /**
+ * Set the current timestamp up to which the table should allow access to the underlying table.
+ * This overrides the timestamp view provided by the indexer - use with care!
+ * @param timestamp timestamp up to which the table should allow access.
+ */
+ public void setCurrentTimestamp(long timestamp) {
+ this.ts = timestamp;
+ }
+
+ public void resetTrackedColumns() {
+ this.trackedColumns.clear();
+ }
+
+ public Set<ColumnTracker> getTrackedColumns() {
+ return this.trackedColumns;
+ }
+
+ /**
+ * Get a scanner on the columns that are needed by the index.
+ * <p>
+ * The returned scanner is already pre-seeked to the first {@link KeyValue} that matches the given
+ * columns with a timestamp earlier than the timestamp to which the table is currently set (the
+ * current state of the table for which we need to build an update).
+ * <p>
+ * If none of the passed columns matches any of the columns in the pending update (as determined
+ * by {@link ColumnReference#matchesFamily(byte[])} and
+ * {@link ColumnReference#matchesQualifier(byte[])}, then an empty scanner will be returned. This
+ * is because it doesn't make sense to build index updates when there is no change in the table
+ * state for any of the columns you are indexing.
+ * <p>
+ * <i>NOTE:</i> This method should <b>not</b> be used during
+ * {@link IndexCodec#getIndexDeletes(TableState)} as the pending update will not yet have been
+ * applied - you are merely attempting to cleanup the current state and therefore do <i>not</i>
+ * need to track the indexed columns.
+ * <p>
+ * As a side-effect, we update a timestamp for the next-most-recent timestamp for the columns you
+ * request - you will never see a column with the timestamp we are tracking, but the next oldest
+ * timestamp for that column.
+ * @param indexedColumns the columns to that will be indexed
+ * @return an iterator over the columns and the {@link IndexUpdate} that should be passed back to
+ * the builder. Even if no update is necessary for the requested columns, you still need
+ * to return the {@link IndexUpdate}, just don't set the update for the
+ * {@link IndexUpdate}.
+ * @throws IOException
+ */
+ public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState(
+ Collection<? extends ColumnReference> indexedColumns) throws IOException {
+ ensureLocalStateInitialized(indexedColumns);
+ // filter out things with a newer timestamp and track the column references to which it applies
+ ColumnTracker tracker = new ColumnTracker(indexedColumns);
+ synchronized (this.trackedColumns) {
+ // we haven't seen this set of columns before, so we need to create a new tracker
+ if (!this.trackedColumns.contains(tracker)) {
+ this.trackedColumns.add(tracker);
+ }
+ }
+
+ Scanner scanner = this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts);
+
+ return new Pair<Scanner, IndexUpdate>(scanner, new IndexUpdate(tracker));
+ }
+
+ /**
+ * Initialize the managed local state. Generally, this will only be called by
+ * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be called concurrently from the outside. Even
+ * then, there is still fairly low contention as each new Put/Delete will have its own table state.
+ */
+ private synchronized void ensureLocalStateInitialized(Collection<? extends ColumnReference> columns)
+ throws IOException {
+ // check to see if we haven't initialized any columns yet
+ Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(columns);
+ // we have all the columns loaded, so we are good to go.
+ if (toCover.isEmpty()) { return; }
+
+ // add the current state of the row
+ this.addUpdate(this.table.getCurrentRowState(update, toCover).listCells(), false);
+
+ // add the covered columns to the set
+ for (ColumnReference ref : toCover) {
+ this.columnSet.addColumn(ref);
+ }
+ }
+
+ @Override
+ public Map<String, byte[]> getUpdateAttributes() {
+ return this.update.getAttributesMap();
+ }
+
+ @Override
+ public byte[] getCurrentRowKey() {
+ return this.update.getRow();
+ }
+
+ public Result getCurrentRowState() {
+ KeyValueScanner scanner = this.memstore.getScanner();
+ List<Cell> kvs = new ArrayList<Cell>();
+ while (scanner.peek() != null) {
+ try {
+ kvs.add(scanner.next());
+ } catch (IOException e) {
+ // this should never happen - something has gone terribly arwy if it has
+ throw new RuntimeException("Local MemStore threw IOException!");
+ }
+ }
+ return Result.create(kvs);
+ }
+
+ /**
+ * Helper to add a {@link Mutation} to the values stored for the current row
+ *
+ * @param pendingUpdate
+ * update to apply
+ */
+ public void addUpdateForTesting(Mutation pendingUpdate) {
+ for (Map.Entry<byte[], List<Cell>> e : pendingUpdate.getFamilyCellMap().entrySet()) {
+ List<Cell> edits = e.getValue();
+ addUpdate(edits);
+ }
+ }
+
+ /**
+ * @param hints
+ */
+ public void setHints(List<? extends IndexedColumnGroup> hints) {
+ this.hints = hints;
+ }
+
+ @Override
+ public List<? extends IndexedColumnGroup> getIndexColumnHints() {
+ return this.hints;
+ }
+
+ @Override
+ public Collection<Cell> getPendingUpdate() {
+ return this.kvs;
+ }
+
+ /**
+ * Set the {@link KeyValue}s in the update for which we are currently building an index update, but don't actually
+ * apply them.
+ *
+ * @param update
+ * pending {@link KeyValue}s
+ */
+ public void setPendingUpdates(Collection<Cell> update) {
+ this.kvs.clear();
+ this.kvs.addAll(update);
+ }
+
+ /**
+ * Apply the {@link KeyValue}s set in {@link #setPendingUpdates(Collection)}.
+ */
+ public void applyPendingUpdates() {
+ this.addUpdate(kvs);
+ }
+
+ /**
+ * Rollback all the given values from the underlying state.
+ *
+ * @param values
+ */
+ public void rollback(Collection<Cell> values) {
+ for (Cell kv : values) {
+ this.memstore.rollback(KeyValueUtil.ensureKeyValue(kv));
+ }
+ }
+
+ @Override
+ public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns)
+ throws IOException {
+ Pair<Scanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns);
+ ValueGetter valueGetter = IndexManagementUtil.createGetterFromScanner(pair.getFirst(), getCurrentRowKey());
+ return new Pair<ValueGetter, IndexUpdate>(valueGetter, pair.getSecond());
+ }
+
+ @Override
+ public Map<String, Object> getContext() {
+ return context;
+ }
}
\ No newline at end of file