You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/04/09 09:08:15 UTC
[1/2] phoenix git commit: PHOENIX-1830 Transactional mutable
secondary indexes
Repository: phoenix
Updated Branches:
refs/heads/txn 0ad5c5675 -> e1521ab05
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
new file mode 100644
index 0000000..dd71a58
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.index;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import co.cask.tephra.Transaction;
+import co.cask.tephra.hbase98.TransactionAwareHTable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.hbase.index.MultiMutation;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.IndexUpdate;
+import org.apache.phoenix.hbase.index.covered.TableState;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
+import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.trace.TracingUtils;
+import org.apache.phoenix.trace.util.NullSpan;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.cloudera.htrace.Span;
+import org.cloudera.htrace.Trace;
+import org.cloudera.htrace.TraceScope;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Do all the work of managing index updates for a transactional table from a single coprocessor. Since the transaction
+ * manager essentially time orders writes through conflict detection, the logic to maintain a secondary index is quite a
+ * bit simpler than the non transactional case. For example, there's no need to muck with the WAL, as failure scenarios
+ * are handled by aborting the transaction.
+ */
+public class PhoenixTransactionalIndexer extends BaseRegionObserver {
+
+ private static final Log LOG = LogFactory.getLog(PhoenixTransactionalIndexer.class);
+
+ private PhoenixIndexCodec codec;
+ private IndexWriter writer;
+ private boolean stopped;
+
+ @Override
+ public void start(CoprocessorEnvironment e) throws IOException {
+ final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment)e;
+ String serverName = env.getRegionServerServices().getServerName().getServerName();
+ codec = new PhoenixIndexCodec();
+ codec.initialize(env);
+
+ // setup the actual index writer
+ this.writer = new IndexWriter(env, serverName + "-tx-index-writer");
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment e) throws IOException {
+ if (this.stopped) { return; }
+ this.stopped = true;
+ String msg = "TxIndexer is being stopped";
+ this.writer.stop(msg);
+ }
+
+ @Override
+ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+
+ Mutation m = miniBatchOp.getOperation(0);
+ if (!codec.isEnabled(m)) {
+ super.preBatchMutate(c, miniBatchOp);
+ return;
+ }
+
+ Collection<Pair<Mutation, byte[]>> indexUpdates = null;
+ // get the current span, or just use a null-span to avoid a bunch of if statements
+ try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
+ Span current = scope.getSpan();
+ if (current == null) {
+ current = NullSpan.INSTANCE;
+ }
+
+ // get the index updates for all elements in this batch
+ indexUpdates = getIndexUpdates(c.getEnvironment(), miniBatchOp);
+
+ current.addTimelineAnnotation("Built index updates, doing preStep");
+ TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
+
+ // no index updates, so we are done
+ if (!indexUpdates.isEmpty()) {
+ this.writer.write(indexUpdates);
+ }
+ } catch (Throwable t) {
+ LOG.error("Failed to update index with entries:" + indexUpdates, t);
+ IndexManagementUtil.rethrowIndexingException(t);
+ }
+ }
+
+ private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ // Collect the set of mutable ColumnReferences so that we can first
+ // run a scan to get the current state. We'll need this to delete
+ // the existing index rows.
+ Map<String,byte[]> updateAttributes = miniBatchOp.getOperation(0).getAttributesMap();
+ PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(env,updateAttributes);
+ Transaction tx = indexMetaData.getTransaction();
+ assert(tx != null);
+ List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers();
+ Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(indexMaintainers.size() * 10);
+ for (IndexMaintainer indexMaintainer : indexMaintainers) {
+ if (!indexMaintainer.isImmutableRows()) {
+ mutableColumns.addAll(indexMaintainer.getAllColumns());
+ }
+ }
+ ResultScanner scanner = null;
+ TransactionAwareHTable txTable = null;
+
+ // Collect up all mutations in batch
+ Map<ImmutableBytesPtr, MultiMutation> mutations =
+ new HashMap<ImmutableBytesPtr, MultiMutation>();
+ for (int i = 0; i < miniBatchOp.size(); i++) {
+ Mutation m = miniBatchOp.getOperation(i);
+ // add the mutation to the batch set
+ ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+ MultiMutation stored = mutations.get(row);
+ // we haven't seen this row before, so add it
+ if (stored == null) {
+ stored = new MultiMutation(row);
+ mutations.put(row, stored);
+ }
+ stored.addAll(m);
+ }
+
+ try {
+ if (!mutableColumns.isEmpty()) {
+ List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size());
+ for (ImmutableBytesPtr ptr : mutations.keySet()) {
+ keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary()));
+ }
+ Scan scan = new Scan();
+ ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
+ scanRanges.initializeScan(scan);
+ scan.setFilter(scanRanges.getSkipScanFilter());
+ TableName tableName = env.getRegion().getRegionInfo().getTable();
+ HTableInterface htable = env.getTable(tableName);
+ txTable = new TransactionAwareHTable(htable);
+ txTable.startTx(tx);
+ scanner = txTable.getScanner(scan);
+ }
+ } finally {
+ if (txTable != null) txTable.close();
+ }
+
+ Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * indexMaintainers.size());
+ if (scanner == null) {
+ for (Mutation m : mutations.values()) {
+ TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), m);
+ state.applyMutation(m);
+ Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData);
+ for (IndexUpdate update : updates) {
+ indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName()));
+ }
+ }
+ } else {
+ Result result;
+ while ((result = scanner.next()) != null) {
+ TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), result);
+ Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData);
+ for (IndexUpdate delete : deletes) {
+ indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName()));
+ }
+ Mutation m = mutations.get(new ImmutableBytesPtr(result.getRow()));
+ state.applyMutation(m);
+ Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData);
+ for (IndexUpdate update : updates) {
+ indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName()));
+ }
+ }
+ }
+ return indexUpdates;
+ }
+
+
+ private static class TxTableState implements TableState {
+ private final byte[] rowKey;
+ private final long currentTimestamp;
+ private final RegionCoprocessorEnvironment env;
+ private final Map<String, byte[]> attributes;
+ private final List<Cell> pendingUpdates;
+ private final Set<ColumnReference> indexedColumns;
+ private final Map<ColumnReference, ImmutableBytesWritable> valueMap;
+
+ private TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, byte[] rowKey) {
+ this.env = env;
+ this.currentTimestamp = currentTimestamp;
+ this.indexedColumns = indexedColumns;
+ this.attributes = attributes;
+ this.rowKey = rowKey;
+ int estimatedSize = indexedColumns.size();
+ this.valueMap = Maps.newHashMapWithExpectedSize(estimatedSize);
+ this.pendingUpdates = Lists.newArrayListWithExpectedSize(estimatedSize);
+ }
+
+ public TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Mutation m) {
+ this(env, indexedColumns, attributes, currentTimestamp, m.getRow());
+ applyMutation(m);
+ }
+
+ public TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Result r) {
+ this(env, indexedColumns, attributes, currentTimestamp, r.getRow());
+
+ for (ColumnReference ref : indexedColumns) {
+ Cell cell = r.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
+ if (cell != null) {
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ valueMap.put(ref, ptr);
+ }
+ }
+ }
+
+ @Override
+ public RegionCoprocessorEnvironment getEnvironment() {
+ return env;
+ }
+
+ @Override
+ public long getCurrentTimestamp() {
+ return currentTimestamp;
+ }
+
+ @Override
+ public Map<String, byte[]> getUpdateAttributes() {
+ return attributes;
+ }
+
+ @Override
+ public byte[] getCurrentRowKey() {
+ return rowKey;
+ }
+
+ @Override
+ public List<? extends IndexedColumnGroup> getIndexColumnHints() {
+ return Collections.emptyList();
+ }
+
+ public void applyMutation(Mutation m) {
+ if (m instanceof Delete) {
+ valueMap.clear();
+ } else {
+ CellScanner scanner = m.cellScanner();
+ try {
+ while (scanner.advance()) {
+ Cell cell = scanner.current();
+ if (cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
+ ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+ valueMap.remove(ref);
+ } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
+ for (ColumnReference ref : indexedColumns) {
+ if (ref.matchesFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) {
+ valueMap.remove(ref);
+ }
+ }
+ } else {
+ ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ valueMap.put(ref, ptr);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e); // Impossible
+ }
+ }
+ }
+
+ @Override
+ public Collection<Cell> getPendingUpdate() {
+ return pendingUpdates;
+ }
+
+ @Override
+ public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns)
+ throws IOException {
+ // TODO: creating these objects over and over again is wasteful
+ ColumnTracker tracker = new ColumnTracker(indexedColumns);
+ ValueGetter getter = new ValueGetter() {
+
+ @Override
+ public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException {
+ return valueMap.get(ref);
+ }
+
+ @Override
+ public byte[] getRowKey() {
+ return rowKey;
+ }
+
+ };
+ Pair<ValueGetter, IndexUpdate> pair = new Pair<ValueGetter, IndexUpdate>(getter, new IndexUpdate(tracker));
+ return pair;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java
deleted file mode 100644
index c471df6..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.index;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.phoenix.hbase.index.covered.TxIndexBuilder;
-import org.apache.phoenix.hbase.index.write.IndexWriter;
-import org.apache.phoenix.util.IndexUtil;
-
-public class PhoenixTxIndexBuilder extends TxIndexBuilder {
- @Override
- public void setup(RegionCoprocessorEnvironment env) throws IOException {
- super.setup(env);
- Configuration conf = env.getConfiguration();
- // Install failure policy that just re-throws exception instead of killing RS
- // or disabling the index
- conf.set(IndexWriter.INDEX_FAILURE_POLICY_CONF_KEY, PhoenixTxIndexFailurePolicy.class.getName());
- }
-
- @Override
- public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
- // The entire purpose of this method impl is to get the existing rows for the
- // table rows being indexed into the block cache, as the index maintenance code
- // does a point scan per row.
- // TODO: provide a means for the transactional case to just return the Scanner
- // for when this is executed as it seems like that would be more efficient.
- IndexUtil.loadMutatingRowsIntoBlockCache(this.env.getRegion(), getCodec(), miniBatchOp, useRawScanToPrimeBlockCache());
- }
-
- private PhoenixIndexCodec getCodec() {
- return (PhoenixIndexCodec)this.codec;
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java
deleted file mode 100644
index fa70cc9..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
- * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
- * governing permissions and limitations under the License.
- */
-package org.apache.phoenix.index;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
-import org.apache.phoenix.hbase.index.write.IndexFailurePolicy;
-
-import com.google.common.collect.Multimap;
-
-public class PhoenixTxIndexFailurePolicy implements IndexFailurePolicy {
- private Stoppable parent;
-
- @Override
- public void stop(String why) {
- if (parent != null) {
- parent.stop(why);
- }
- }
-
- @Override
- public boolean isStopped() {
- return parent == null ? false : parent.isStopped();
- }
-
- @Override
- public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
- this.parent = parent;
- }
-
- @Override
- public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause)
- throws IOException {
- if (cause instanceof IOException) {
- throw (IOException)cause;
- } else if (cause instanceof RuntimeException) { throw (RuntimeException)cause; }
- throw new IOException(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index dae7939..20ab602 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -112,7 +112,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.PhoenixIndexBuilder;
import org.apache.phoenix.index.PhoenixIndexCodec;
-import org.apache.phoenix.index.PhoenixTxIndexBuilder;
+import org.apache.phoenix.index.PhoenixTransactionalIndexer;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
@@ -705,11 +705,18 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// all-or-none mutate class which break when this coprocessor is installed (PHOENIX-1318).
if ((tableType != PTableType.INDEX && tableType != PTableType.VIEW)
&& !SchemaUtil.isMetaTable(tableName)
- && !SchemaUtil.isStatsTable(tableName)
- && !descriptor.hasCoprocessor(Indexer.class.getName())) {
- Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
- opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
- Indexer.enableIndexing(descriptor, isTransactional ? PhoenixTxIndexBuilder.class : PhoenixIndexBuilder.class, opts, priority);
+ && !SchemaUtil.isStatsTable(tableName)) {
+ if (isTransactional) {
+ if (!descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) {
+ descriptor.addCoprocessor(PhoenixTransactionalIndexer.class.getName(), null, priority, null);
+ }
+ } else {
+ if (!descriptor.hasCoprocessor(Indexer.class.getName())) {
+ Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
+ opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+ Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
+ }
+ }
}
if (SchemaUtil.isStatsTable(tableName) && !descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName())) {
descriptor.addCoprocessor(MultiRowMutationEndpoint.class.getName(),
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index b317d77..8c2e990 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -100,6 +100,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.schema.MetaDataSplitPolicy;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
@@ -238,7 +239,8 @@ public interface QueryConstants {
HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
HColumnDescriptor.KEEP_DELETED_CELLS + "=" + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n" +
// Install split policy to prevent a tenant's metadata from being split across regions.
- HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n";
+ HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "',\n" +
+ PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
public static final String CREATE_STATS_TABLE_METADATA =
"CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_STATS_TABLE + "\"(\n" +
@@ -259,7 +261,8 @@ public interface QueryConstants {
HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_STAT_DATA_VERSIONS + ",\n" +
HColumnDescriptor.KEEP_DELETED_CELLS + "=" + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n" +
// Install split policy to prevent a physical table's stats from being split across regions.
- HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n";
+ HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "',\n" +
+ PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
public static final String CREATE_SEQUENCE_METADATA =
"CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + TYPE_SEQUENCE + "\"(\n" +
@@ -277,5 +280,6 @@ public interface QueryConstants {
LIMIT_REACHED_FLAG + " BOOLEAN \n" +
" CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + "," + SEQUENCE_SCHEMA + "," + SEQUENCE_NAME + "))\n" +
HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
- HColumnDescriptor.KEEP_DELETED_CELLS + "=" + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + "\n";
+ HColumnDescriptor.KEEP_DELETED_CELLS + "=" + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n" +
+ PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
index 265fc78..ee06179 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
@@ -17,12 +17,23 @@
*/
package org.apache.phoenix.trace;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
-import org.apache.commons.configuration.Configuration;
+import static org.apache.phoenix.metrics.MetricInfo.ANNOTATION;
+import static org.apache.phoenix.metrics.MetricInfo.DESCRIPTION;
+import static org.apache.phoenix.metrics.MetricInfo.END;
+import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME;
+import static org.apache.phoenix.metrics.MetricInfo.PARENT;
+import static org.apache.phoenix.metrics.MetricInfo.SPAN;
+import static org.apache.phoenix.metrics.MetricInfo.START;
+import static org.apache.phoenix.metrics.MetricInfo.TAG;
+import static org.apache.phoenix.metrics.MetricInfo.TRACE;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
import org.apache.commons.configuration.SubsetConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,20 +42,16 @@ import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.MetricsSink;
import org.apache.hadoop.metrics2.MetricsTag;
-import org.apache.phoenix.metrics.*;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.metrics.MetricInfo;
+import org.apache.phoenix.metrics.Metrics;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.QueryUtil;
-import javax.annotation.Nullable;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.*;
-
-import static org.apache.phoenix.metrics.MetricInfo.*;
-import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
/**
* Write the metrics to a phoenix table.
@@ -169,7 +176,9 @@ public class PhoenixMetricsSink implements MetricsSink {
TAG_COUNT + " smallint, " +
ANNOTATION_COUNT + " smallint" +
" CONSTRAINT pk PRIMARY KEY (" + TRACE.columnName + ", "
- + PARENT.columnName + ", " + SPAN.columnName + "))\n";
+ + PARENT.columnName + ", " + SPAN.columnName + "))\n" +
+ PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
+;
PreparedStatement stmt = conn.prepareStatement(ddl);
stmt.execute();
this.table = table;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 1fcf16a..45729a2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -24,8 +24,6 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -43,15 +41,12 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.compile.ColumnResolver;
import org.apache.phoenix.compile.FromCompiler;
import org.apache.phoenix.compile.IndexStatementRewriter;
-import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.compile.WhereCompiler;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -65,16 +60,13 @@ import org.apache.phoenix.expression.visitor.RowKeyExpressionVisitor;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.IndexMaintainer;
-import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.SQLParser;
import org.apache.phoenix.parse.SelectStatement;
-import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
import org.apache.phoenix.schema.ColumnNotFoundException;
@@ -644,58 +636,4 @@ public class IndexUtil {
return col.getExpressionStr() == null ? IndexUtil.getCaseSensitiveDataColumnFullName(col.getName().getString())
: col.getExpressionStr();
}
-
- /*
- * The entire purpose of this method is to get the existing rows for the table rows being indexed into the
- * block cache, as the index maintenance code does a point scan per row. Though for the transactional
- * case we may be loading more than we need, since we're not applying the transaction filters, that
- * should still be ok.
- */
- public static void loadMutatingRowsIntoBlockCache(HRegion region, PhoenixIndexCodec codec, MiniBatchOperationInProgress<Mutation> miniBatchOp, boolean useRawScan)
- throws IOException {
- List<KeyRange> keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size());
- Map<ImmutableBytesWritable, IndexMaintainer> maintainers =
- new HashMap<ImmutableBytesWritable, IndexMaintainer>();
- ImmutableBytesWritable indexTableName = new ImmutableBytesWritable();
- for (int i = 0; i < miniBatchOp.size(); i++) {
- Mutation m = miniBatchOp.getOperation(i);
- keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow()));
- List<IndexMaintainer> indexMaintainers = codec.getIndexMetaData(m.getAttributesMap()).getIndexMaintainers();
-
- for(IndexMaintainer indexMaintainer: indexMaintainers) {
- if (indexMaintainer.isImmutableRows() && indexMaintainer.isLocalIndex()) continue;
- indexTableName.set(indexMaintainer.getIndexTableName());
- if (maintainers.get(indexTableName) != null) continue;
- maintainers.put(indexTableName, indexMaintainer);
- }
-
- }
- if (maintainers.isEmpty()) return;
- Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList<IndexMaintainer>(maintainers.values()));
- scan.setRaw(useRawScan);
- ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
- scanRanges.initializeScan(scan);
- scan.setFilter(scanRanges.getSkipScanFilter());
- RegionScanner scanner = region.getScanner(scan);
- // Run through the scanner using internal nextRaw method
- region.startRegionOperation();
- try {
- synchronized (scanner) {
- boolean hasMore;
- do {
- List<Cell> results = Lists.newArrayList();
- // Results are potentially returned even when the return value of s.next is
- // false since this is an indication of whether or not there are more values
- // after the ones returned
- hasMore = scanner.nextRaw(results);
- } while (hasMore);
- }
- } finally {
- try {
- scanner.close();
- } finally {
- region.closeRegionOperation();
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
index 7e73a81..0575b3f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
@@ -16,7 +16,7 @@ import java.util.List;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.phoenix.index.BaseIndexCodec;
+import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
/**
* An {@link IndexCodec} for testing that allow you to specify the index updates/deletes, regardless of the current
@@ -41,12 +41,12 @@ public class CoveredIndexCodecForTesting extends BaseIndexCodec {
}
@Override
- public Iterable<IndexUpdate> getIndexDeletes(TableState state) {
+ public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) {
return this.deletes;
}
@Override
- public Iterable<IndexUpdate> getIndexUpserts(TableState state) {
+ public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) {
return this.updates;
}
@@ -59,7 +59,4 @@ public class CoveredIndexCodecForTesting extends BaseIndexCodec {
public boolean isEnabled(Mutation m) {
return true;
}
-
- @Override
- public void setContext(TableState state, Mutation mutation) throws IOException {}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
index f0c1483..e4654ea 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.covered.IndexCodec;
import org.apache.phoenix.hbase.index.covered.IndexUpdate;
import org.apache.phoenix.hbase.index.covered.LocalTableState;
@@ -180,14 +181,14 @@ public class TestCoveredColumnIndexCodec {
// check the codec for deletes it should send
LocalTableState state = new LocalTableState(env, table, p);
- Iterable<IndexUpdate> updates = codec.getIndexDeletes(state);
+ Iterable<IndexUpdate> updates = codec.getIndexDeletes(state, IndexMetaData.NULL_INDEX_META_DATA);
assertFalse("Found index updates without any existing kvs in table!", updates.iterator().next()
.isValid());
// get the updates with the pending update
state.setCurrentTimestamp(1);
state.addPendingUpdates(kvs);
- updates = codec.getIndexUpserts(state);
+ updates = codec.getIndexUpserts(state, IndexMetaData.NULL_INDEX_META_DATA);
assertTrue("Didn't find index updates for pending primary table update!", updates.iterator()
.hasNext());
for (IndexUpdate update : updates) {
@@ -210,7 +211,7 @@ public class TestCoveredColumnIndexCodec {
state = new LocalTableState(env, table, d);
state.setCurrentTimestamp(2);
// check the cleanup of the current table, after the puts (mocking a 'next' update)
- updates = codec.getIndexDeletes(state);
+ updates = codec.getIndexDeletes(state, IndexMetaData.NULL_INDEX_META_DATA);
for (IndexUpdate update : updates) {
assertTrue("Didn't have any index cleanup, even though there is current state",
update.isValid());
@@ -240,7 +241,7 @@ public class TestCoveredColumnIndexCodec {
state.setCurrentTimestamp(d.getTimeStamp());
// now we shouldn't see anything when getting the index update
state.addPendingUpdates(d.getFamilyCellMap().get(FAMILY));
- Iterable<IndexUpdate> updates = codec.getIndexUpserts(state);
+ Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, IndexMetaData.NULL_INDEX_META_DATA);
for (IndexUpdate update : updates) {
assertFalse("Had some index updates, though it should have been covered by the delete",
update.isValid());
[2/2] phoenix git commit: PHOENIX-1830 Transactional mutable
secondary indexes
Posted by ja...@apache.org.
PHOENIX-1830 Transactional mutable secondary indexes
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e1521ab0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e1521ab0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e1521ab0
Branch: refs/heads/txn
Commit: e1521ab059b51a81738e5b9fd8390034dd1b9f1c
Parents: 0ad5c56
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Apr 9 00:08:11 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Apr 9 00:08:11 2015 -0700
----------------------------------------------------------------------
.../end2end/index/TxGlobalMutableIndexIT.java | 41 +++
.../EndToEndCoveredColumnsIndexBuilderIT.java | 8 +-
.../covered/example/FailWithoutRetriesIT.java | 12 +-
.../org/apache/phoenix/hbase/index/Indexer.java | 54 ---
.../phoenix/hbase/index/MultiMutation.java | 86 +++++
.../hbase/index/builder/BaseIndexBuilder.java | 23 +-
.../hbase/index/builder/BaseIndexCodec.java | 47 +++
.../hbase/index/builder/IndexBuildManager.java | 18 +-
.../hbase/index/builder/IndexBuilder.java | 19 +-
.../phoenix/hbase/index/covered/IndexCodec.java | 23 +-
.../hbase/index/covered/IndexMetaData.java | 22 ++
.../hbase/index/covered/LocalTableState.java | 10 +-
.../hbase/index/covered/NonTxIndexBuilder.java | 65 ++--
.../phoenix/hbase/index/covered/TableState.java | 2 -
.../hbase/index/covered/TxIndexBuilder.java | 247 --------------
.../example/CoveredColumnIndexCodec.java | 10 +-
.../covered/example/CoveredColumnIndexer.java | 6 +-
.../apache/phoenix/index/BaseIndexCodec.java | 59 ----
.../phoenix/index/PhoenixIndexBuilder.java | 83 ++++-
.../apache/phoenix/index/PhoenixIndexCodec.java | 160 ++++-----
.../phoenix/index/PhoenixIndexMetaData.java | 97 ++++++
.../index/PhoenixTransactionalIndexer.java | 339 +++++++++++++++++++
.../phoenix/index/PhoenixTxIndexBuilder.java | 53 ---
.../index/PhoenixTxIndexFailurePolicy.java | 50 ---
.../query/ConnectionQueryServicesImpl.java | 19 +-
.../apache/phoenix/query/QueryConstants.java | 10 +-
.../phoenix/trace/PhoenixMetricsSink.java | 41 ++-
.../java/org/apache/phoenix/util/IndexUtil.java | 62 ----
.../covered/CoveredIndexCodecForTesting.java | 9 +-
.../example/TestCoveredColumnIndexCodec.java | 9 +-
30 files changed, 909 insertions(+), 775 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
new file mode 100644
index 0000000..2d22b0f
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import java.util.Map;
+
+import org.apache.phoenix.end2end.Shadower;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+
+import com.google.common.collect.Maps;
+
+public class TxGlobalMutableIndexIT extends GlobalMutableIndexIT {
+ @BeforeClass
+ @Shadower(classBeingShadowed = BaseMutableIndexIT.class)
+ public static void doSetup() throws Exception {
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+ // Don't split intra region so we can more easily know that the n-way parallelization is for the explain plan
+ // Forces server cache to be used
+ props.put(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, Integer.toString(2));
+ props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+ props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
index fa85f00..1cdd508 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
@@ -172,15 +172,15 @@ public class EndToEndCoveredColumnsIndexBuilderIT {
private Queue<TableStateVerifier> verifiers = new ArrayDeque<TableStateVerifier>();
@Override
- public Iterable<IndexUpdate> getIndexDeletes(TableState state) {
+ public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) {
verify(state);
- return super.getIndexDeletes(state);
+ return super.getIndexDeletes(state, context);
}
@Override
- public Iterable<IndexUpdate> getIndexUpserts(TableState state) {
+ public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) {
verify(state);
- return super.getIndexUpserts(state);
+ return super.getIndexUpserts(state, context);
}
private void verify(TableState state) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
index 281ad63..6367945 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.util.Bytes;
@@ -32,10 +31,11 @@ import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.hbase.index.IndexTestingUtils;
import org.apache.phoenix.hbase.index.Indexer;
import org.apache.phoenix.hbase.index.TableName;
+import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.covered.IndexUpdate;
import org.apache.phoenix.hbase.index.covered.TableState;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
-import org.apache.phoenix.index.BaseIndexCodec;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -63,18 +63,14 @@ public class FailWithoutRetriesIT {
public static class FailingTestCodec extends BaseIndexCodec {
@Override
- public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException {
+ public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) throws IOException {
throw new RuntimeException("Intentionally failing deletes for " + FailWithoutRetriesIT.class.getName());
}
@Override
- public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException {
+ public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) throws IOException {
throw new RuntimeException("Intentionally failing upserts for " + FailWithoutRetriesIT.class.getName());
}
-
- @Override
- public void setContext(TableState state, Mutation mutation) throws IOException {}
-
}
@BeforeClass
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index a4fc96b..d79ffb5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -25,12 +25,10 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -282,58 +280,6 @@ public class Indexer extends BaseRegionObserver {
}
}
- private class MultiMutation extends Mutation {
-
- private ImmutableBytesPtr rowKey;
-
- public MultiMutation(ImmutableBytesPtr rowkey) {
- this.rowKey = rowkey;
- }
-
- /**
- * @param stored
- */
- public void addAll(Mutation stored) {
- // add all the kvs
- for (Entry<byte[], List<Cell>> kvs : stored.getFamilyCellMap().entrySet()) {
- byte[] family = kvs.getKey();
- List<Cell> list = getKeyValueList(family, kvs.getValue().size());
- list.addAll(kvs.getValue());
- familyMap.put(family, list);
- }
-
- // add all the attributes, not overriding already stored ones
- for (Entry<String, byte[]> attrib : stored.getAttributesMap().entrySet()) {
- if (this.getAttribute(attrib.getKey()) == null) {
- this.setAttribute(attrib.getKey(), attrib.getValue());
- }
- }
- }
-
- private List<Cell> getKeyValueList(byte[] family, int hint) {
- List<Cell> list = familyMap.get(family);
- if (list == null) {
- list = new ArrayList<Cell>(hint);
- }
- return list;
- }
-
- @Override
- public byte[] getRow(){
- return this.rowKey.copyBytesIfNecessary();
- }
-
- @Override
- public int hashCode() {
- return this.rowKey.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- return o == null ? false : o.hashCode() == this.hashCode();
- }
- }
-
/**
* Add the index updates to the WAL, or write to the index table, if the WAL has been disabled
* @return <tt>true</tt> if the WAL has been updated.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/MultiMutation.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/MultiMutation.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/MultiMutation.java
new file mode 100644
index 0000000..f6381c4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/MultiMutation.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+public class MultiMutation extends Mutation {
+
+ private ImmutableBytesPtr rowKey;
+
+ public MultiMutation(ImmutableBytesPtr rowkey) {
+ this.rowKey = rowkey;
+ }
+
+ /**
+ * @param stored
+ */
+ public void addAll(Mutation stored) {
+ // add all the kvs
+ for (Entry<byte[], List<Cell>> kvs : stored.getFamilyCellMap().entrySet()) {
+ byte[] family = kvs.getKey();
+ List<Cell> list = getKeyValueList(family, kvs.getValue().size());
+ list.addAll(kvs.getValue());
+ familyMap.put(family, list);
+ }
+
+ // add all the attributes, not overriding already stored ones
+ for (Entry<String, byte[]> attrib : stored.getAttributesMap().entrySet()) {
+ if (this.getAttribute(attrib.getKey()) == null) {
+ this.setAttribute(attrib.getKey(), attrib.getValue());
+ }
+ }
+ }
+
+ private List<Cell> getKeyValueList(byte[] family, int hint) {
+ List<Cell> list = familyMap.get(family);
+ if (list == null) {
+ list = new ArrayList<Cell>(hint);
+ }
+ return list;
+ }
+
+ @Override
+ public byte[] getRow(){
+ return this.rowKey.copyBytesIfNecessary();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((rowKey == null) ? 0 : rowKey.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ MultiMutation other = (MultiMutation)obj;
+ return rowKey.equals(other.rowKey);
+ }
+
+ }
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index dfb9ad4..4e329e9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.covered.IndexCodec;
import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
@@ -40,8 +41,6 @@ public abstract class BaseIndexBuilder implements IndexBuilder {
protected RegionCoprocessorEnvironment env;
protected IndexCodec codec;
- abstract protected boolean useRawScanToPrimeBlockCache();
-
@Override
public void extendBaseIndexBuilderInstead() {}
@@ -65,9 +64,14 @@ public abstract class BaseIndexBuilder implements IndexBuilder {
}
@Override
- public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp, IndexMetaData context) throws IOException {
// noop
}
+
+ @Override
+ public IndexMetaData getIndexMetaData(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ return IndexMetaData.NULL_INDEX_META_DATA;
+ }
@Override
public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp) {
@@ -98,22 +102,11 @@ public abstract class BaseIndexBuilder implements IndexBuilder {
}
@Override
- public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered)
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered, IndexMetaData context)
throws IOException {
throw new UnsupportedOperationException();
}
- /**
- * {@inheritDoc}
- * <p>
- * By default, assumes that all mutations should <b>not be batched</b>. That is to say, each mutation always applies
- * to different rows, even if they are in the same batch, or are independent updates.
- */
- @Override
- public byte[] getBatchId(Mutation m) {
- return this.codec.getBatchId(m);
- }
-
@Override
public void stop(String why) {
LOG.debug("Stopping because: " + why);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
new file mode 100644
index 0000000..1ce4e2e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.builder;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.hbase.index.covered.IndexCodec;
+
+/**
+ *
+ */
+public abstract class BaseIndexCodec implements IndexCodec {
+
+ @Override
+ public void initialize(RegionCoprocessorEnvironment env) throws IOException {
+ // noop
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * By default, the codec is always enabled. Subclasses should override this method if they want do
+ * decide to index on a per-mutation basis.
+ * @throws IOException
+ */
+ @Override
+ public boolean isEnabled(Mutation m) throws IOException {
+ return true;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index d5fd34d..ae2125e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner;
import org.apache.phoenix.hbase.index.parallel.Task;
import org.apache.phoenix.hbase.index.parallel.TaskBatch;
@@ -115,7 +116,8 @@ public class IndexBuildManager implements Stoppable {
MiniBatchOperationInProgress<Mutation> miniBatchOp,
Collection<? extends Mutation> mutations) throws Throwable {
// notify the delegate that we have started processing a batch
- this.delegate.batchStarted(miniBatchOp);
+ final IndexMetaData indexMetaData = this.delegate.getIndexMetaData(miniBatchOp);
+ this.delegate.batchStarted(miniBatchOp, indexMetaData);
// parallelize each mutation into its own task
// each task is cancelable via two mechanisms: (1) underlying HRegion is closing (which would
@@ -129,7 +131,7 @@ public class IndexBuildManager implements Stoppable {
@Override
public Collection<Pair<Mutation, byte[]>> call() throws IOException {
- return delegate.getIndexUpdate(m);
+ return delegate.getIndexUpdate(m, indexMetaData);
}
});
@@ -156,28 +158,24 @@ public class IndexBuildManager implements Stoppable {
}
public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
- Collection<KeyValue> filtered) throws IOException {
+ Collection<KeyValue> filtered, IndexMetaData indexMetaData) throws IOException {
// this is run async, so we can take our time here
- return delegate.getIndexUpdateForFilteredRows(filtered);
+ return delegate.getIndexUpdateForFilteredRows(filtered, indexMetaData);
}
public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp) {
delegate.batchCompleted(miniBatchOp);
}
- public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp)
+ public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp, IndexMetaData indexMetaData)
throws IOException {
- delegate.batchStarted(miniBatchOp);
+ delegate.batchStarted(miniBatchOp, indexMetaData);
}
public boolean isEnabled(Mutation m) throws IOException {
return delegate.isEnabled(m);
}
- public byte[] getBatchId(Mutation m) {
- return delegate.getBatchId(m);
- }
-
@Override
public void stop(String why) {
if (stopped) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index 194fdcc..36aba77 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
/**
* Interface to build updates ({@link Mutation}s) to the index tables, based on the primary table
@@ -64,6 +65,7 @@ public interface IndexBuilder extends Stoppable {
* Implementers must ensure that this method is thread-safe - it could (and probably will) be
* called concurrently for different mutations, which may or may not be part of the same batch.
* @param mutation update to the primary table to be indexed.
+ * @param context TODO
* @return a Map of the mutations to make -> target index table name
* @throws IOException on failure
*/
@@ -76,7 +78,7 @@ public interface IndexBuilder extends Stoppable {
Noop Failure mode
*/
- public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException;
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context) throws IOException;
/**
* Build an index update to cleanup the index when we remove {@link KeyValue}s via the normal flush or compaction
@@ -94,12 +96,13 @@ public interface IndexBuilder extends Stoppable {
*
* @param filtered {@link KeyValue}s that previously existed, but won't be included
* in further output from HBase.
+ * @param context TODO
*
* @return a {@link Map} of the mutations to make -> target index table name
* @throws IOException on failure
*/
public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
- Collection<KeyValue> filtered)
+ Collection<KeyValue> filtered, IndexMetaData context)
throws IOException;
/**
@@ -115,10 +118,13 @@ public interface IndexBuilder extends Stoppable {
* <i>after</i> the {@link #getIndexUpdate} methods. Therefore, you will likely need an attribute
* on your {@link Put}/{@link Delete} to indicate it is a batch operation.
* @param miniBatchOp the full batch operation to be written
+ * @param context TODO
* @throws IOException
*/
- public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException;
+ public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp, IndexMetaData context) throws IOException;
+ public IndexMetaData getIndexMetaData(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException;
+
/**
* This allows the codec to dynamically change whether or not indexing should take place for a
* table. If it doesn't take place, we can save a lot of time on the regular Put patch. By making
@@ -133,11 +139,4 @@ public interface IndexBuilder extends Stoppable {
* @throws IOException
*/
public boolean isEnabled(Mutation m) throws IOException;
-
- /**
- * @param m mutation that has been received by the indexer and is waiting to be indexed
- * @return the ID of batch to which the Mutation belongs, or <tt>null</tt> if the mutation is not
- * part of a batch.
- */
- public byte[] getBatchId(Mutation m);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
index e3ef831..93de11e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
@@ -14,7 +14,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.phoenix.index.BaseIndexCodec;
+import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
/**
* Codec for creating index updates from the current state of a table.
@@ -49,10 +49,11 @@ public interface IndexCodec {
* @param state
* the current state of the table that needs to be cleaned up. Generally, you only care about the latest
* column values, for each column you are indexing for each index table.
+ * @param context TODO
* @return the pairs of (deletes, index table name) that should be applied.
* @throws IOException
*/
- public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException;
+ public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) throws IOException;
// table state has the pending update already applied, before calling
// get the new index entries
@@ -68,10 +69,11 @@ public interface IndexCodec {
* @param state
* the current state of the table that needs to an index update Generally, you only care about the latest
* column values, for each column you are indexing for each index table.
+ * @param context TODO
* @return the pairs of (updates,index table name) that should be applied.
* @throws IOException
*/
- public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException;
+ public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) throws IOException;
/**
* This allows the codec to dynamically change whether or not indexing should take place for a table. If it doesn't
@@ -88,19 +90,4 @@ public interface IndexCodec {
* @throws IOException
*/
public boolean isEnabled(Mutation m) throws IOException;
-
- /**
- * Get the batch identifier of the given mutation. Generally, updates to the table will take place in a batch of
- * updates; if we know that the mutation is part of a batch, we can build the state much more intelligently.
- * <p>
- * <b>If you have batches that have multiple updates to the same row state, you must specify a batch id for each
- * batch. Otherwise, we cannot guarantee index correctness</b>
- *
- * @param m
- * mutation that may or may not be part of the batch
- * @return <tt>null</tt> if the mutation is not part of a batch or an id for the batch.
- */
- public byte[] getBatchId(Mutation m);
-
- public void setContext(TableState state, Mutation mutation) throws IOException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
new file mode 100644
index 0000000..ee25a40
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered;
+
+public interface IndexMetaData {
+ public static final IndexMetaData NULL_INDEX_META_DATA = new IndexMetaData() {};
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index f47a71a..2da5771 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -36,8 +36,6 @@ import org.apache.phoenix.hbase.index.scanner.Scanner;
import org.apache.phoenix.hbase.index.scanner.ScannerBuilder;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
-import com.google.common.collect.Maps;
-
/**
* Manage the state of the HRegion's view of the table, for the single row.
* <p>
@@ -58,7 +56,6 @@ public class LocalTableState implements TableState {
private List<Cell> kvs = new ArrayList<Cell>();
private List<? extends IndexedColumnGroup> hints;
private CoveredColumns columnSet;
- private final Map<String,Object> context = Maps.newHashMap();
public LocalTableState(RegionCoprocessorEnvironment environment, LocalHBaseState table, Mutation update) {
this.env = environment;
@@ -132,7 +129,7 @@ public class LocalTableState implements TableState {
* state for any of the columns you are indexing.
* <p>
* <i>NOTE:</i> This method should <b>not</b> be used during
- * {@link IndexCodec#getIndexDeletes(TableState)} as the pending update will not yet have been
+ * {@link IndexCodec#getIndexDeletes(TableState, BatchState)} as the pending update will not yet have been
* applied - you are merely attempting to cleanup the current state and therefore do <i>not</i>
* need to track the indexed columns.
* <p>
@@ -275,9 +272,4 @@ public class LocalTableState implements TableState {
ValueGetter valueGetter = IndexManagementUtil.createGetterFromScanner(pair.getFirst(), getCurrentRowKey());
return new Pair<ValueGetter, IndexUpdate>(valueGetter, pair.getSecond());
}
-
- @Override
- public Map<String, Object> getContext() {
- return context;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
index 4ba3671..11e7d1a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -62,14 +62,13 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
}
@Override
- public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException {
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData indexMetaData) throws IOException {
// create a state manager, so we can manage each batch
LocalTableState state = new LocalTableState(env, localTable, mutation);
- codec.setContext(state, mutation);
// build the index updates for each group
IndexUpdateManager manager = new IndexUpdateManager();
- batchMutationAndAddUpdates(manager, state, mutation);
+ batchMutationAndAddUpdates(manager, state, mutation, indexMetaData);
if (LOG.isDebugEnabled()) {
LOG.debug("Found index updates for Mutation: " + mutation + "\n" + manager);
@@ -84,16 +83,17 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
* don't all have the timestamp, so we need to manage everything in batches based on timestamp.
* <p>
* Adds all the updates in the {@link Mutation} to the state, as a side-effect.
- *
- * @param updateMap
- * index updates into which to add new updates. Modified as a side-effect.
* @param state
* current state of the row for the mutation.
* @param m
* mutation to batch
+ * @param indexMetaData TODO
+ * @param updateMap
+ * index updates into which to add new updates. Modified as a side-effect.
+ *
* @throws IOException
*/
- private void batchMutationAndAddUpdates(IndexUpdateManager manager, LocalTableState state, Mutation m) throws IOException {
+ private void batchMutationAndAddUpdates(IndexUpdateManager manager, LocalTableState state, Mutation m, IndexMetaData indexMetaData) throws IOException {
// split the mutation into timestamp-based batches
Collection<Batch> batches = createTimestampBatchesFromMutation(m);
@@ -106,7 +106,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
* group will see that as the current state, which will can cause the a delete and a put to be created for
* the next group.
*/
- if (addMutationsForBatch(manager, batch, state, cleanupCurrentState)) {
+ if (addMutationsForBatch(manager, batch, state, cleanupCurrentState, indexMetaData)) {
cleanupCurrentState = false;
}
}
@@ -197,12 +197,13 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
* <tt>true</tt> if we should should attempt to cleanup the current state of the table, in the event of a
* 'back in time' batch. <tt>false</tt> indicates we should not attempt the cleanup, e.g. an earlier
* batch already did the cleanup.
+ * @param indexMetaData TODO
* @return <tt>true</tt> if we cleaned up the current state forward (had a back-in-time put), <tt>false</tt>
* otherwise
* @throws IOException
*/
private boolean addMutationsForBatch(IndexUpdateManager updateMap, Batch batch, LocalTableState state,
- boolean requireCurrentStateCleanup) throws IOException {
+ boolean requireCurrentStateCleanup, IndexMetaData indexMetaData) throws IOException {
// need a temporary manager for the current batch. It should resolve any conflicts for the
// current batch. Essentially, we can get the case where a batch doesn't change the current
@@ -214,18 +215,18 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
// determine if we need to make any cleanup given the pending update.
long batchTs = batch.getTimestamp();
state.setPendingUpdates(batch.getKvs());
- addCleanupForCurrentBatch(updateMap, batchTs, state);
+ addCleanupForCurrentBatch(updateMap, batchTs, state, indexMetaData);
// A.2 do a single pass first for the updates to the current state
state.applyPendingUpdates();
- long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap);
+ long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap, indexMetaData);
// if all the updates are the latest thing in the index, we are done - don't go and fix history
if (ColumnTracker.isNewestTime(minTs)) { return false; }
// A.3 otherwise, we need to roll up through the current state and get the 'correct' view of the
// index. after this, we have the correct view of the index, from the batch up to the index
while (!ColumnTracker.isNewestTime(minTs)) {
- minTs = addUpdateForGivenTimestamp(minTs, state, updateMap);
+ minTs = addUpdateForGivenTimestamp(minTs, state, updateMap, indexMetaData);
}
// B. only cleanup the current state if we need to - its a huge waste of effort otherwise.
@@ -240,7 +241,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
// cleanup the pending batch. If anything in the correct history is covered by Deletes used to
// 'fix' history (same row key and ts), we just drop the delete (we don't want to drop both
// because the update may have a different set of columns or value based on the update).
- cleanupIndexStateFromBatchOnward(updateMap, batchTs, state);
+ cleanupIndexStateFromBatchOnward(updateMap, batchTs, state, indexMetaData);
// have to roll the state forward again, so the current state is correct
state.applyPendingUpdates();
@@ -249,18 +250,18 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
return false;
}
- private long addUpdateForGivenTimestamp(long ts, LocalTableState state, IndexUpdateManager updateMap)
+ private long addUpdateForGivenTimestamp(long ts, LocalTableState state, IndexUpdateManager updateMap, IndexMetaData indexMetaData)
throws IOException {
state.setCurrentTimestamp(ts);
- ts = addCurrentStateMutationsForBatch(updateMap, state);
+ ts = addCurrentStateMutationsForBatch(updateMap, state, indexMetaData);
return ts;
}
- private void addCleanupForCurrentBatch(IndexUpdateManager updateMap, long batchTs, LocalTableState state)
+ private void addCleanupForCurrentBatch(IndexUpdateManager updateMap, long batchTs, LocalTableState state, IndexMetaData indexMetaData)
throws IOException {
// get the cleanup for the current state
state.setCurrentTimestamp(batchTs);
- addDeleteUpdatesToMap(updateMap, state, batchTs);
+ addDeleteUpdatesToMap(updateMap, state, batchTs, indexMetaData);
// ignore any index tracking from the delete
state.resetTrackedColumns();
}
@@ -271,19 +272,20 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
*
* @param updateMap
* to update with index mutations
- * @param batch
- * to apply to the current state
* @param state
* current state of the table
+ * @param indexMetaData TODO
+ * @param batch
+ * to apply to the current state
* @return the minimum timestamp across all index columns requested. If {@link ColumnTracker#isNewestTime(long)}
* returns <tt>true</tt> on the returned timestamp, we know that this <i>was not a back-in-time update</i>.
* @throws IOException
*/
- private long addCurrentStateMutationsForBatch(IndexUpdateManager updateMap, LocalTableState state)
+ private long addCurrentStateMutationsForBatch(IndexUpdateManager updateMap, LocalTableState state, IndexMetaData indexMetaData)
throws IOException {
// get the index updates for this current batch
- Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state);
+ Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state, indexMetaData);
state.resetTrackedColumns();
/*
@@ -346,13 +348,14 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
* @param state
* current state of the primary table. Should already by setup to the correct state from which we want to
* cleanup.
+ * @param indexMetaData TODO
* @throws IOException
*/
- private void cleanupIndexStateFromBatchOnward(IndexUpdateManager updateMap, long batchTs, LocalTableState state)
+ private void cleanupIndexStateFromBatchOnward(IndexUpdateManager updateMap, long batchTs, LocalTableState state, IndexMetaData indexMetaData)
throws IOException {
// get the cleanup for the current state
state.setCurrentTimestamp(batchTs);
- addDeleteUpdatesToMap(updateMap, state, batchTs);
+ addDeleteUpdatesToMap(updateMap, state, batchTs, indexMetaData);
Set<ColumnTracker> trackers = state.getTrackedColumns();
long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
for (ColumnTracker tracker : trackers) {
@@ -363,21 +366,22 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
state.resetTrackedColumns();
if (!ColumnTracker.isNewestTime(minTs)) {
state.setHints(Lists.newArrayList(trackers));
- cleanupIndexStateFromBatchOnward(updateMap, minTs, state);
+ cleanupIndexStateFromBatchOnward(updateMap, minTs, state, indexMetaData);
}
}
/**
- * Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState)} and then add them to the
+ * Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState, IndexMetaData)} and then add them to the
* update map.
* <p>
* Expects the {@link LocalTableState} to already be correctly setup (correct timestamp, updates applied, etc).
+ * @param indexMetaData TODO
*
* @throws IOException
*/
- protected void addDeleteUpdatesToMap(IndexUpdateManager updateMap, LocalTableState state, long ts)
+ protected void addDeleteUpdatesToMap(IndexUpdateManager updateMap, LocalTableState state, long ts, IndexMetaData indexMetaData)
throws IOException {
- Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state);
+ Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state, indexMetaData);
if (cleanup != null) {
for (IndexUpdate d : cleanup) {
if (!d.isValid()) {
@@ -392,14 +396,9 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
}
@Override
- public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered)
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered, IndexMetaData indexMetaData)
throws IOException {
// TODO Implement IndexBuilder.getIndexUpdateForFilteredRows
return null;
}
-
- @Override
- protected boolean useRawScanToPrimeBlockCache() {
- return false;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
index b8b2f19..d8b215c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
@@ -80,6 +80,4 @@ public interface TableState {
* @return the keyvalues in the pending update to the table.
*/
Collection<Cell> getPendingUpdate();
-
- Map<String,Object> getContext();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java
deleted file mode 100644
index d90edc1..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hbase.index.covered;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import co.cask.tephra.Transaction;
-import co.cask.tephra.hbase98.TransactionAwareHTable;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.hbase.index.ValueGetter;
-import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder;
-import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
-import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
-import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-public class TxIndexBuilder extends BaseIndexBuilder {
- private static final Log LOG = LogFactory.getLog(TxIndexBuilder.class);
- public static final String TRANSACTION = "TRANSACTION";
- private TransactionAwareHTable txTable;
-
- @Override
- public void setup(RegionCoprocessorEnvironment env) throws IOException {
- super.setup(env);
- HTableInterface htable = env.getTable(env.getRegion().getRegionInfo().getTable());
- this.txTable = new TransactionAwareHTable(htable); // TODO: close?
- }
-
- @Override
- public void stop(String why) {
- try {
- if (this.txTable != null) txTable.close();
- } catch (IOException e) {
- LOG.warn("Unable to close txTable", e);
- } finally {
- super.stop(why);
- }
- }
-
- @Override
- public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException {
- // get the index updates for this current batch
- TxTableState state = new TxTableState(mutation);
- codec.setContext(state, mutation);
- Transaction tx = (Transaction)state.getContext().get(TRANSACTION);
- state.setCurrentTransaction(tx);
- Collection<Pair<Mutation, byte[]>> indexUpdates = Lists.newArrayListWithExpectedSize(2);
- Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state);
- for (IndexUpdate delete : deletes) {
- indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName()));
- }
- state.addPendingUpdates(mutation);
- // TODO: state may need to maintain the old state with the new state super imposed
- // An alternate easier way would be to calculate the state after the data mutations
- // have been applied.
- Iterable<IndexUpdate> updates = codec.getIndexUpserts(state);
- for (IndexUpdate update : updates) {
- indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName()));
- }
- return indexUpdates;
- }
-
- private class TxTableState implements TableState {
- private Put put;
- private Map<String, byte[]> attributes;
- private List<Cell> pendingUpdates = Lists.newArrayList();
- private Transaction transaction;
- private final Map<String,Object> context = Maps.newHashMap();
-
- public TxTableState(Mutation m) {
- this.put = new Put(m.getRow());
- this.attributes = m.getAttributesMap();
- }
-
- @Override
- public RegionCoprocessorEnvironment getEnvironment() {
- return env;
- }
-
- @Override
- public long getCurrentTimestamp() {
- return transaction.getReadPointer();
- }
-
- @Override
- public Map<String, byte[]> getUpdateAttributes() {
- return attributes;
- }
-
- @Override
- public byte[] getCurrentRowKey() {
- return put.getRow();
- }
-
- @Override
- public List<? extends IndexedColumnGroup> getIndexColumnHints() {
- return Collections.emptyList();
- }
-
- public void addPendingUpdate(Cell cell) throws IOException {
- put.add(cell);
- }
-
- public void addPendingUpdates(Mutation m) throws IOException {
- if (m instanceof Delete) {
- put.getFamilyCellMap().clear();
- } else {
- CellScanner scanner = m.cellScanner();
- while (scanner.advance()) {
- Cell cell = scanner.current();
- if (cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
- byte[] family = CellUtil.cloneFamily(cell);
- byte[] qualifier = CellUtil.cloneQualifier(cell);
- put.add(family, qualifier, HConstants.EMPTY_BYTE_ARRAY);
- } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
- byte[] family = CellUtil.cloneFamily(cell);
- put.getFamilyCellMap().remove(family);
- } else {
- put.add(cell);
- }
- }
- }
- }
-
- @Override
- public Collection<Cell> getPendingUpdate() {
- return pendingUpdates;
- }
-
- public void setCurrentTransaction(Transaction tx) {
- this.transaction = tx;
- }
-
- @Override
- public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns)
- throws IOException {
- ColumnTracker tracker = new ColumnTracker(indexedColumns);
- IndexUpdate indexUpdate = new IndexUpdate(tracker);
- final byte[] rowKey = getCurrentRowKey();
- if (!pendingUpdates.isEmpty()) {
- final Map<ColumnReference, ImmutableBytesPtr> valueMap = Maps.newHashMapWithExpectedSize(pendingUpdates
- .size());
- for (Cell kv : pendingUpdates) {
- // create new pointers to each part of the kv
- ImmutableBytesPtr value = new ImmutableBytesPtr(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
- valueMap.put(new ColumnReference(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()), value);
- }
- ValueGetter getter = new ValueGetter() {
- @Override
- public ImmutableBytesWritable getLatestValue(ColumnReference ref) {
- // TODO: from IndexMaintainer we return null if ref is empty key value. Needed?
- return valueMap.get(ref);
- }
- @Override
- public byte[] getRowKey() {
- return rowKey;
- }
- };
- return new Pair<ValueGetter, IndexUpdate>(getter, indexUpdate);
- }
- // Establish initial state of table by finding the old values
- // We'll apply the Mutation to this next
- Get get = new Get(rowKey);
- get.setMaxVersions();
- for (ColumnReference ref : indexedColumns) {
- get.addColumn(ref.getFamily(), ref.getQualifier());
- }
- txTable.startTx(transaction);
- final Result result = txTable.get(get);
- ValueGetter getter = new ValueGetter() {
-
- @Override
- public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException {
- Cell cell = result.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
- if (cell == null) {
- return null;
- }
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
- return ptr;
- }
-
- @Override
- public byte[] getRowKey() {
- return rowKey;
- }
-
- };
- for (ColumnReference ref : indexedColumns) {
- Cell cell = result.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
- if (cell != null) {
- addPendingUpdate(cell);
- }
- }
- return new Pair<ValueGetter, IndexUpdate>(getter, indexUpdate);
- }
-
- @Override
- public Map<String, Object> getContext() {
- return context;
- }
-
- }
-
- @Override
- protected boolean useRawScanToPrimeBlockCache() {
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
index d4bd460..59bc8de 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@ -24,11 +24,12 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.covered.IndexUpdate;
import org.apache.phoenix.hbase.index.covered.LocalTableState;
import org.apache.phoenix.hbase.index.covered.TableState;
import org.apache.phoenix.hbase.index.scanner.Scanner;
-import org.apache.phoenix.index.BaseIndexCodec;
import com.google.common.collect.Lists;
@@ -59,7 +60,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
}
@Override
- public Iterable<IndexUpdate> getIndexUpserts(TableState state) {
+ public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) {
List<IndexUpdate> updates = new ArrayList<IndexUpdate>();
for (ColumnGroup group : groups) {
IndexUpdate update = getIndexUpdateForGroup(group, state);
@@ -113,7 +114,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
}
@Override
- public Iterable<IndexUpdate> getIndexDeletes(TableState state) {
+ public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) {
List<IndexUpdate> deletes = new ArrayList<IndexUpdate>();
for (ColumnGroup group : groups) {
deletes.add(getDeleteForGroup(group, state));
@@ -360,7 +361,4 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
// simple check for the moment.
return groups.size() > 0;
}
-
- @Override
- public void setContext(TableState state, Mutation mutation) throws IOException {}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
index de8f752..60698c7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.hbase.index.covered.Batch;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.covered.LocalTableState;
import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
@@ -118,8 +119,7 @@ public class CoveredColumnIndexer extends NonTxIndexBuilder {
@Override
public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
- Collection<KeyValue> filtered) throws IOException {
-
+ Collection<KeyValue> filtered, IndexMetaData indexMetaData) throws IOException {
// stores all the return values
IndexUpdateManager updateMap = new IndexUpdateManager();
// batch the updates by row to make life easier and ordered
@@ -146,7 +146,7 @@ public class CoveredColumnIndexer extends NonTxIndexBuilder {
for (Batch entry : timeBatch) {
//just set the timestamp on the table - it already has all the future state
state.setCurrentTimestamp(entry.getTimestamp());
- this.addDeleteUpdatesToMap(updateMap, state, entry.getTimestamp());
+ this.addDeleteUpdatesToMap(updateMap, state, entry.getTimestamp(), indexMetaData);
}
}
return updateMap.toMap();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/BaseIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/BaseIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/BaseIndexCodec.java
deleted file mode 100644
index 1c45cd3..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/BaseIndexCodec.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.index;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-
-import org.apache.phoenix.hbase.index.covered.IndexCodec;
-
-/**
- *
- */
-public abstract class BaseIndexCodec implements IndexCodec {
-
- @Override
- public void initialize(RegionCoprocessorEnvironment env) throws IOException {
- // noop
- }
-
- /**
- * {@inheritDoc}
- * <p>
- * By default, the codec is always enabled. Subclasses should override this method if they want do
- * decide to index on a per-mutation basis.
- * @throws IOException
- */
- @Override
- public boolean isEnabled(Mutation m) throws IOException {
- return true;
- }
-
- /**
- * {@inheritDoc}
- * <p>
- * Assumes each mutation is not in a batch. Subclasses that have different batching behavior
- * should override this.
- */
- @Override
- public byte[] getBatchId(Mutation m) {
- return null;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index 1e28766..09a9f90 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -18,14 +18,32 @@
package org.apache.phoenix.index;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.hbase.index.write.IndexWriter;
-import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import com.google.common.collect.Lists;
/**
* Index builder for covered-columns index that ties into phoenix for faster use.
@@ -33,6 +51,15 @@ import org.apache.phoenix.util.IndexUtil;
public class PhoenixIndexBuilder extends NonTxIndexBuilder {
@Override
+ public IndexMetaData getIndexMetaData(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ return new PhoenixIndexMetaData(env, miniBatchOp.getOperation(0).getAttributesMap());
+ }
+
+ protected PhoenixIndexCodec getCodec() {
+ return (PhoenixIndexCodec)codec;
+ }
+
+ @Override
public void setup(RegionCoprocessorEnvironment env) throws IOException {
super.setup(env);
Configuration conf = env.getConfiguration();
@@ -43,16 +70,54 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder {
}
@Override
- public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp, IndexMetaData context) throws IOException {
// The entire purpose of this method impl is to get the existing rows for the
// table rows being indexed into the block cache, as the index maintenance code
// does a point scan per row.
- // TODO: provide a means for the transactional case to just return the Scanner
- // for when this is executed as it seems like that would be more efficient.
- IndexUtil.loadMutatingRowsIntoBlockCache(this.env.getRegion(), getCodec(), miniBatchOp, useRawScanToPrimeBlockCache());
- }
-
- private PhoenixIndexCodec getCodec() {
- return (PhoenixIndexCodec)this.codec;
+ List<IndexMaintainer> indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers();
+ List<KeyRange> keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size());
+ Map<ImmutableBytesWritable, IndexMaintainer> maintainers =
+ new HashMap<ImmutableBytesWritable, IndexMaintainer>();
+ ImmutableBytesWritable indexTableName = new ImmutableBytesWritable();
+ for (int i = 0; i < miniBatchOp.size(); i++) {
+ Mutation m = miniBatchOp.getOperation(i);
+ keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow()));
+
+ for(IndexMaintainer indexMaintainer: indexMaintainers) {
+ if (indexMaintainer.isImmutableRows()) continue;
+ indexTableName.set(indexMaintainer.getIndexTableName());
+ if (maintainers.get(indexTableName) != null) continue;
+ maintainers.put(indexTableName, indexMaintainer);
+ }
+
+ }
+ if (maintainers.isEmpty()) return;
+ Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList<IndexMaintainer>(maintainers.values()));
+ scan.setRaw(true);
+ ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
+ scanRanges.initializeScan(scan);
+ scan.setFilter(scanRanges.getSkipScanFilter());
+ HRegion region = env.getRegion();
+ RegionScanner scanner = region.getScanner(scan);
+ // Run through the scanner using internal nextRaw method
+ region.startRegionOperation();
+ try {
+ synchronized (scanner) {
+ boolean hasMore;
+ do {
+ List<Cell> results = Lists.newArrayList();
+ // Results are potentially returned even when the return value of s.next is
+ // false since this is an indication of whether or not there are more values
+ // after the ones returned
+ hasMore = scanner.nextRaw(results);
+ } while (hasMore);
+ }
+ } finally {
+ try {
+ scanner.close();
+ } finally {
+ region.closeRegionOperation();
+ }
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 571c559..1fe9931 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -10,53 +10,41 @@
package org.apache.phoenix.index;
import java.io.IOException;
-import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import co.cask.tephra.Transaction;
-
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.cache.GlobalCache;
-import org.apache.phoenix.cache.IndexMetaDataCache;
-import org.apache.phoenix.cache.ServerCacheClient;
-import org.apache.phoenix.cache.TenantCache;
-import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
import org.apache.phoenix.hbase.index.covered.IndexCodec;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.covered.IndexUpdate;
import org.apache.phoenix.hbase.index.covered.TableState;
-import org.apache.phoenix.hbase.index.covered.TxIndexBuilder;
-import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
-import org.apache.phoenix.hbase.index.scanner.Scanner;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.util.MetaDataUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.ServerUtil;
-import org.apache.phoenix.util.TransactionUtil;
import com.google.common.collect.Lists;
/**
* Phoenix-based {@link IndexCodec}. Manages all the logic of how to cleanup an index (
- * {@link #getIndexDeletes(TableState)}) as well as what the new index state should be (
- * {@link #getIndexUpserts(TableState)}).
+ * {@link #getIndexDeletes(TableState, IndexMetaData)}) as well as what the new index state should be (
+ * {@link #getIndexUpserts(TableState, IndexMetaData)}).
*/
public class PhoenixIndexCodec extends BaseIndexCodec {
public static final String INDEX_MD = "IdxMD";
public static final String INDEX_UUID = "IdxUUID";
public static final String INDEX_MAINTAINERS = "IndexMaintainers";
+ private static KeyValueBuilder KV_BUILDER = GenericKeyValueBuilder.INSTANCE;
private RegionCoprocessorEnvironment env;
- private KeyValueBuilder kvBuilder = GenericKeyValueBuilder.INSTANCE;;
@Override
public void initialize(RegionCoprocessorEnvironment env) throws IOException {
@@ -71,67 +59,78 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
return true;
}
- public IndexMetaDataCache getIndexMetaData(Map<String, byte[]> attributes) throws IOException {
- if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
- byte[] uuid = attributes.get(INDEX_UUID);
- if (uuid == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
- byte[] md = attributes.get(INDEX_MD);
- byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE);
- if (md != null) {
- final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md);
- final Transaction txn = TransactionUtil.decodeTxnState(txState);
- return new IndexMetaDataCache() {
-
- @Override
- public void close() throws IOException {}
-
- @Override
- public List<IndexMaintainer> getIndexMaintainers() {
- return indexMaintainers;
- }
-
- @Override
- public Transaction getTransaction() {
- return txn;
- }
-
- };
- } else {
- byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB);
- ImmutableBytesWritable tenantId = tenantIdBytes == null ? null : new ImmutableBytesWritable(tenantIdBytes);
- TenantCache cache = GlobalCache.getTenantCache(env, tenantId);
- IndexMetaDataCache indexCache = (IndexMetaDataCache)cache.getServerCache(new ImmutableBytesPtr(uuid));
- if (indexCache == null) {
- String msg = "key=" + ServerCacheClient.idToString(uuid) + " region=" + env.getRegion();
- SQLException e = new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_METADATA_NOT_FOUND)
- .setMessage(msg).build().buildException();
- ServerUtil.throwIOException("Index update failed", e); // will not return
+ @Override
+ public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) throws IOException {
+ List<IndexMaintainer> indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers();
+ if (indexMaintainers.get(0).isRowDeleted(state.getPendingUpdate())) {
+ return Collections.emptyList();
+ }
+ // TODO: confirm that this special case isn't needed
+ // (as state should match this with the above call, since there are no mutable columns)
+ /*
+ if (maintainer.isImmutableRows()) {
+ indexUpdate = new IndexUpdate(new ColumnTracker(maintainer.getAllColumns()));
+ indexUpdate.setTable(maintainer.getIndexTableName());
+ valueGetter = maintainer.createGetterFromKeyValues(dataRowKey, state.getPendingUpdate());
+ }
+ */
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ ptr.set(state.getCurrentRowKey());
+ List<IndexUpdate> indexUpdates = Lists.newArrayList();
+ for (IndexMaintainer maintainer : indexMaintainers) {
+ if (maintainer.isLocalIndex()) { // TODO: remove this once verified assert passes
+ assert(Bytes.compareTo(maintainer.getIndexTableName(), MetaDataUtil.getLocalIndexPhysicalName(env.getRegion().getTableDesc().getName())) == 0);
}
- return indexCache;
+ Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns());
+ ValueGetter valueGetter = statePair.getFirst();
+ IndexUpdate indexUpdate = statePair.getSecond();
+ indexUpdate.setTable(maintainer.getIndexTableName());
+ Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env
+ .getRegion().getStartKey(), env.getRegion().getEndKey());
+ indexUpdate.setUpdate(put);
+ indexUpdates.add(indexUpdate);
}
+ return indexUpdates;
+ }
+ @Override
+ public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) throws IOException {
+ List<IndexMaintainer> indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers();
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ ptr.set(state.getCurrentRowKey());
+ List<IndexUpdate> indexUpdates = Lists.newArrayList();
+ for (IndexMaintainer maintainer : indexMaintainers) {
+ if (maintainer.isLocalIndex()) { // TODO: remove this once verified assert passes
+ assert(Bytes.compareTo(maintainer.getIndexTableName(), MetaDataUtil.getLocalIndexPhysicalName(env.getRegion().getTableDesc().getName())) == 0);
+ }
+ if (maintainer.isImmutableRows()) {
+ continue;
+ }
+ Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns());
+ ValueGetter valueGetter = statePair.getFirst();
+ IndexUpdate indexUpdate = statePair.getSecond();
+ indexUpdate.setTable(maintainer.getIndexTableName());
+ Delete delete = maintainer.buildDeleteMutation(KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(),
+ state.getCurrentTimestamp(), env.getRegion().getStartKey(), env.getRegion().getEndKey());
+ indexUpdate.setUpdate(delete);
+ indexUpdates.add(indexUpdate);
+ }
+ return indexUpdates;
}
+ /*
@Override
- public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException {
- return getIndexUpdates(state, true);
+ public Iterable<IndexUpdate> getIndexUpserts(TableState state, BatchContext context) throws IOException {
+ return getIndexUpdates(state, context, true);
}
@Override
- public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException {
- return getIndexUpdates(state, false);
+ public Iterable<IndexUpdate> getIndexDeletes(TableState state, BatchContext context) throws IOException {
+ return getIndexUpdates(state, context, false);
}
- /**
- * @param state
- * @param upsert
- * prepare index upserts if it's true otherwise prepare index deletes.
- * @return
- * @throws IOException
- */
- private Iterable<IndexUpdate> getIndexUpdates(TableState state, boolean upsert) throws IOException {
- @SuppressWarnings("unchecked")
- List<IndexMaintainer> indexMaintainers = (List<IndexMaintainer>)state.getContext().get(INDEX_MAINTAINERS);
+ private Iterable<IndexUpdate> getIndexUpdates(TableState state, BatchContext context, boolean upsert) throws IOException {
+ List<IndexMaintainer> indexMaintainers = ((PhoenixBatchContext)context).getIndexMetaData().getIndexMaintainers();
if (indexMaintainers.isEmpty()) { return Collections.emptyList(); }
List<IndexUpdate> indexUpdates = Lists.newArrayList();
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
@@ -165,10 +164,10 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
}
Mutation mutation = null;
if (upsert) {
- mutation = maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, state.getCurrentTimestamp(), env
+ mutation = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env
.getRegion().getStartKey(), env.getRegion().getEndKey());
} else {
- mutation = maintainer.buildDeleteMutation(kvBuilder, valueGetter, ptr, state.getPendingUpdate(),
+ mutation = maintainer.buildDeleteMutation(KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(),
state.getCurrentTimestamp(), env.getRegion().getStartKey(), env.getRegion().getEndKey());
}
indexUpdate.setUpdate(mutation);
@@ -180,25 +179,10 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
}
return indexUpdates;
}
+ */
@Override
public boolean isEnabled(Mutation m) throws IOException {
return hasIndexMaintainers(m.getAttributesMap());
}
-
- @Override
- public byte[] getBatchId(Mutation m) {
- Map<String, byte[]> attributes = m.getAttributesMap();
- return attributes.get(INDEX_UUID);
- }
-
- @Override
- public void setContext(TableState state, Mutation mutation) throws IOException {
- IndexMetaDataCache indexCache = getIndexMetaData(state.getUpdateAttributes());
- List<IndexMaintainer> indexMaintainers = indexCache.getIndexMaintainers();
- Map<String,Object> context = state.getContext();
- context.clear();
- context.put(INDEX_MAINTAINERS, indexMaintainers);
- context.put(TxIndexBuilder.TRANSACTION, indexCache.getTransaction());
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
new file mode 100644
index 0000000..26c1c12
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.index;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+import co.cask.tephra.Transaction;
+
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.IndexMetaDataCache;
+import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.TransactionUtil;
+
+public class PhoenixIndexMetaData implements IndexMetaData {
+ private final IndexMetaDataCache indexMetaDataCache;
+
+ private static IndexMetaDataCache getIndexMetaData(RegionCoprocessorEnvironment env, Map<String, byte[]> attributes) throws IOException {
+ if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
+ byte[] uuid = attributes.get(PhoenixIndexCodec.INDEX_UUID);
+ if (uuid == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
+ byte[] md = attributes.get(PhoenixIndexCodec.INDEX_MD);
+ byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE);
+ if (md != null) {
+ final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md);
+ final Transaction txn = TransactionUtil.decodeTxnState(txState);
+ return new IndexMetaDataCache() {
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ public List<IndexMaintainer> getIndexMaintainers() {
+ return indexMaintainers;
+ }
+
+ @Override
+ public Transaction getTransaction() {
+ return txn;
+ }
+
+ };
+ } else {
+ byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB);
+ ImmutableBytesWritable tenantId = tenantIdBytes == null ? null : new ImmutableBytesWritable(tenantIdBytes);
+ TenantCache cache = GlobalCache.getTenantCache(env, tenantId);
+ IndexMetaDataCache indexCache = (IndexMetaDataCache)cache.getServerCache(new ImmutableBytesPtr(uuid));
+ if (indexCache == null) {
+ String msg = "key=" + ServerCacheClient.idToString(uuid) + " region=" + env.getRegion();
+ SQLException e = new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_METADATA_NOT_FOUND)
+ .setMessage(msg).build().buildException();
+ ServerUtil.throwIOException("Index update failed", e); // will not return
+ }
+ return indexCache;
+ }
+
+ }
+
+ public PhoenixIndexMetaData(RegionCoprocessorEnvironment env, Map<String,byte[]> attributes) throws IOException {
+ indexMetaDataCache = getIndexMetaData(env, attributes);
+ }
+
+ public Transaction getTransaction() {
+ return indexMetaDataCache.getTransaction();
+ }
+
+ public List<IndexMaintainer> getIndexMaintainers() {
+ return indexMetaDataCache.getIndexMaintainers();
+ }
+}