You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/02/02 05:00:00 UTC
[26/50] [abbrv] phoenix git commit: PHOENIX-2446 Immutable index -
Index vs base table row count does not match when index is created during
data load
PHOENIX-2446 Immutable index - Index vs base table row count does not match when index is created during data load
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a138cfe0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a138cfe0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a138cfe0
Branch: refs/heads/calcite
Commit: a138cfe0f3df47091a0d9fe0285a8e572d76b252
Parents: 8574d43
Author: Thomas D'Silva <td...@salesforce.com>
Authored: Fri Jan 8 15:37:31 2016 -0800
Committer: Thomas D'Silva <td...@salesforce.com>
Committed: Thu Jan 21 20:34:18 2016 -0800
----------------------------------------------------------------------
.../phoenix/end2end/index/ImmutableIndexIT.java | 81 ++++++++---
.../apache/phoenix/execute/PartialCommitIT.java | 2 +-
.../cache/aggcache/SpillableGroupByCache.java | 17 +--
.../apache/phoenix/compile/FromCompiler.java | 8 +-
.../phoenix/compile/ListJarsQueryPlan.java | 2 +-
.../compile/PostLocalIndexDDLCompiler.java | 104 +++++++++++++
.../apache/phoenix/compile/TraceQueryPlan.java | 2 +-
.../apache/phoenix/compile/UnionCompiler.java | 2 +-
.../phoenix/coprocessor/BaseRegionScanner.java | 11 +-
.../GroupedAggregateRegionObserver.java | 38 +----
.../coprocessor/MetaDataEndpointImpl.java | 7 +-
.../coprocessor/MetaDataRegionObserver.java | 8 +-
.../phoenix/coprocessor/ScanRegionObserver.java | 17 +--
.../UngroupedAggregateRegionObserver.java | 24 +--
.../coprocessor/generated/PTableProtos.java | 145 +++++++++++++++----
.../apache/phoenix/execute/MutationState.java | 57 ++++----
.../org/apache/phoenix/query/QueryServices.java | 2 +
.../phoenix/query/QueryServicesOptions.java | 9 +-
.../apache/phoenix/schema/DelegateColumn.java | 5 +
.../apache/phoenix/schema/MetaDataClient.java | 133 +++++++----------
.../java/org/apache/phoenix/schema/PColumn.java | 2 +
.../org/apache/phoenix/schema/PColumnImpl.java | 21 ++-
.../apache/phoenix/schema/PMetaDataImpl.java | 2 +-
.../org/apache/phoenix/schema/SaltingUtil.java | 2 +-
.../phoenix/execute/CorrelatePlanTest.java | 2 +-
.../phoenix/execute/UnnestArrayPlanTest.java | 4 +-
.../expression/ColumnExpressionTest.java | 8 +-
.../iterate/AggregateResultScannerTest.java | 4 +
.../phoenix/query/QueryServicesTestImpl.java | 4 +-
phoenix-protocol/src/main/MetaDataService.proto | 2 +-
phoenix-protocol/src/main/PTable.proto | 1 +
31 files changed, 449 insertions(+), 277 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
index 7171382..c18e4ab 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.end2end.index;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -34,6 +35,7 @@ import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -48,6 +50,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
import org.apache.phoenix.end2end.Shadower;
+import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PropertiesUtil;
@@ -55,6 +58,7 @@ import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -68,6 +72,7 @@ import com.google.common.collect.Maps;
public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
private final boolean localIndex;
+ private final boolean transactional;
private final String tableDDLOptions;
private final String tableName;
private final String indexName;
@@ -80,6 +85,7 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
public ImmutableIndexIT(boolean localIndex, boolean transactional) {
this.localIndex = localIndex;
+ this.transactional = transactional;
StringBuilder optionBuilder = new StringBuilder("IMMUTABLE_ROWS=true");
if (transactional) {
optionBuilder.append(", TRANSACTIONAL=true");
@@ -98,16 +104,55 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
serverProps.put("hbase.coprocessor.region.classes", CreateIndexRegionObserver.class.getName());
Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
clientProps.put(QueryServices.TRANSACTIONS_ENABLED, "true");
+ clientProps.put(QueryServices.INDEX_POPULATION_SLEEP_TIME, "15000");
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
}
@Parameters(name="localIndex = {0} , transactional = {1}")
public static Collection<Boolean[]> data() {
- return Arrays.asList(new Boolean[][] {
- { false, true }, { true, true }
- });
+ return Arrays.asList(new Boolean[][] {
+ { false, false }, { false, true },
+ { true, false }, { true, true } });
}
+ @Test
+ @Ignore
+ public void testDropIfImmutableKeyValueColumn() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(false);
+ String ddl =
+ "CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
+ Statement stmt = conn.createStatement();
+ stmt.execute(ddl);
+ populateTestTable(fullTableName);
+ ddl =
+ "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON "
+ + fullTableName + " (long_col1)";
+ stmt.execute(ddl);
+
+ ResultSet rs;
+
+ rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(1));
+ rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(1));
+
+ conn.setAutoCommit(true);
+ String dml = "DELETE from " + fullTableName + " WHERE long_col2 = 4";
+ try {
+ conn.createStatement().execute(dml);
+ fail();
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS.getErrorCode(),
+ e.getErrorCode());
+ }
+
+ conn.createStatement().execute("DROP TABLE " + fullTableName);
+ }
+ }
@Test
public void testCreateIndexDuringUpsertSelect() throws Exception {
@@ -119,8 +164,7 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
+ " (long_pk, varchar_pk)"
+ " INCLUDE (long_col1, long_col2)";
- Connection conn = DriverManager.getConnection(getUrl(), props);
- try {
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.setAutoCommit(false);
Statement stmt = conn.createStatement();
stmt.execute(ddl);
@@ -133,7 +177,6 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
String upsertSelect = "UPSERT INTO " + TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) " +
"SELECT varchar_pk||'_upsert_select', char_pk, int_pk, long_pk, decimal_pk, date_pk FROM "+ TABLE_NAME;
conn.createStatement().execute(upsertSelect);
-
ResultSet rs;
rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + TABLE_NAME);
assertTrue(rs.next());
@@ -142,9 +185,6 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
assertTrue(rs.next());
assertEquals(440,rs.getInt(1));
}
- finally {
- conn.close();
- }
}
// used to create an index while a batch of rows are being written
@@ -156,7 +196,7 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
String tableName = c.getEnvironment().getRegion().getRegionInfo()
.getTable().getNameAsString();
if (tableName.equalsIgnoreCase(TABLE_NAME)
- // create the index after the second batch of 1000 rows
+ // create the index after the second batch
&& Bytes.startsWith(put.getRow(), Bytes.toBytes("varchar200_upsert_select"))) {
try {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -171,13 +211,14 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
}
private static class UpsertRunnable implements Runnable {
- private static final int NUM_ROWS_IN_BATCH = 10000;
+ private static final int NUM_ROWS_IN_BATCH = 1000;
private final String fullTableName;
public UpsertRunnable(String fullTableName) {
this.fullTableName = fullTableName;
}
+ @Override
public void run() {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
@@ -190,12 +231,9 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
fistRowInBatch = false;
}
conn.commit();
- Thread.sleep(500);
}
} catch (SQLException e) {
throw new RuntimeException(e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
}
}
}
@@ -213,10 +251,17 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
Statement stmt = conn.createStatement();
stmt.execute(ddl);
- ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+ ExecutorService executorService = Executors.newFixedThreadPool(numThreads, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setDaemon(true);
+ return t;
+ }
+ });
List<Future<?>> futureList = Lists.newArrayListWithExpectedSize(numThreads);
for (int i =0; i<numThreads; ++i) {
- futureList.add(threadPool.submit(new UpsertRunnable(fullTableName)));
+ futureList.add(executorService.submit(new UpsertRunnable(fullTableName)));
}
// upsert some rows before creating the index
Thread.sleep(500);
@@ -235,8 +280,8 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
for (Future<?> future : futureList) {
future.cancel(true);
}
- threadPool.shutdownNow();
- threadPool.awaitTermination(30, TimeUnit.SECONDS);
+ executorService.shutdownNow();
+ executorService.awaitTermination(30, TimeUnit.SECONDS);
Thread.sleep(100);
ResultSet rs;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index 8d7ebcb..0fb1869 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -265,7 +265,7 @@ public class PartialCommitIT extends BaseOwnClusterIT {
Connection con = driver.connect(url, new Properties());
PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class));
final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator());
- // passing a null mutation staate forces the connection.newMutationState() to be used to create the MutationState
+ // passing a null mutation state forces the connection.newMutationState() to be used to create the MutationState
return new PhoenixConnection(phxCon, null) {
@Override
protected MutationState newMutationState(int maxSize) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
index 69fc6f6..8edeb3a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
@@ -340,12 +340,7 @@ public class SpillableGroupByCache implements GroupByCache {
final Iterator<Entry<ImmutableBytesWritable, Aggregator[]>> cacheIter = new EntryIterator();
// scanner using the spillable implementation
- return new BaseRegionScanner() {
- @Override
- public HRegionInfo getRegionInfo() {
- return s.getRegionInfo();
- }
-
+ return new BaseRegionScanner(s) {
@Override
public void close() throws IOException {
try {
@@ -374,16 +369,6 @@ public class SpillableGroupByCache implements GroupByCache {
SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
return cacheIter.hasNext();
}
-
- @Override
- public long getMaxResultSize() {
- return s.getMaxResultSize();
- }
-
- @Override
- public int getBatch() {
- return s.getBatch();
- }
};
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 9b2c460..dd93c81 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -221,7 +221,7 @@ public class FromCompiler {
Expression sourceExpression = projector.getColumnProjector(column.getPosition()).getExpression();
PColumnImpl projectedColumn = new PColumnImpl(column.getName(), column.getFamilyName(),
sourceExpression.getDataType(), sourceExpression.getMaxLength(), sourceExpression.getScale(), sourceExpression.isNullable(),
- column.getPosition(), sourceExpression.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp());
+ column.getPosition(), sourceExpression.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(), column.isDynamic());
projectedColumns.add(projectedColumn);
}
PTable t = PTableImpl.makePTable(table, projectedColumns);
@@ -406,7 +406,7 @@ public class FromCompiler {
String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
PName tenantId = connection.getTenantId();
PTable theTable = null;
- if (updateCacheImmediately || connection.getAutoCommit()) {
+ if (updateCacheImmediately) {
MetaDataMutationResult result = client.updateCache(schemaName, tableName);
timeStamp = TransactionUtil.getResolvedTimestamp(connection, result);
theTable = result.getTable();
@@ -547,7 +547,7 @@ public class FromCompiler {
familyName = PNameFactory.newName(family);
}
allcolumns.add(new PColumnImpl(name, familyName, dynColumn.getDataType(), dynColumn.getMaxLength(),
- dynColumn.getScale(), dynColumn.isNull(), position, dynColumn.getSortOrder(), dynColumn.getArraySize(), null, false, dynColumn.getExpression(), false));
+ dynColumn.getScale(), dynColumn.isNull(), position, dynColumn.getSortOrder(), dynColumn.getArraySize(), null, false, dynColumn.getExpression(), false, true));
position++;
}
theTable = PTableImpl.makePTable(theTable, allcolumns);
@@ -645,7 +645,7 @@ public class FromCompiler {
}
PColumnImpl column = new PColumnImpl(PNameFactory.newName(alias),
PNameFactory.newName(QueryConstants.DEFAULT_COLUMN_FAMILY),
- null, 0, 0, true, position++, SortOrder.ASC, null, null, false, null, false);
+ null, 0, 0, true, position++, SortOrder.ASC, null, null, false, null, false, false);
columns.add(column);
}
PTable t = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index dac691f..f2b4856 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -80,7 +80,7 @@ public class ListJarsQueryPlan implements QueryPlan {
PColumn column =
new PColumnImpl(PNameFactory.newName("jar_location"), null,
PVarchar.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null,
- false, null, false);
+ false, null, false, false);
List<PColumn> columns = new ArrayList<PColumn>();
columns.add(column);
Expression expression =
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
new file mode 100644
index 0000000..f92738c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.ScanUtil;
+
+import com.google.common.collect.Lists;
+
+/**
+ * For local indexes, we optimize the initial index population by *not* sending
+ * Puts over the wire for the index rows, as we don't need to do that. Instead,
+ * we tap into our region observer to generate the index rows based on the data
+ * rows as we scan
+ */
+public class PostLocalIndexDDLCompiler {
+ private final PhoenixConnection connection;
+ private final String tableName;
+
+ public PostLocalIndexDDLCompiler(PhoenixConnection connection, String tableName) {
+ this.connection = connection;
+ this.tableName = tableName;
+ }
+
+ public MutationPlan compile(final PTable index) throws SQLException {
+ try (final PhoenixStatement statement = new PhoenixStatement(connection)) {
+ String query = "SELECT count(*) FROM " + tableName;
+ final QueryPlan plan = statement.compileQuery(query);
+ TableRef tableRef = plan.getTableRef();
+ Scan scan = plan.getContext().getScan();
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ final PTable dataTable = tableRef.getTable();
+ List<PTable> indexes = Lists.newArrayListWithExpectedSize(1);
+ // Only build newly created index.
+ indexes.add(index);
+ IndexMaintainer.serialize(dataTable, ptr, indexes, plan.getContext().getConnection());
+ // Set attribute on scan that UngroupedAggregateRegionObserver will switch on.
+ // We'll detect that this attribute was set the server-side and write the index
+ // rows per region as a result. The value of the attribute will be our persisted
+ // index maintainers.
+ // Define the LOCAL_INDEX_BUILD as a new static in BaseScannerRegionObserver
+ scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr));
+ // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
+ // However, in this case, we need to project all of the data columns that contribute to the index.
+ IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection);
+ for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
+ scan.addColumn(columnRef.getFamily(), columnRef.getQualifier());
+ }
+
+ // Go through MutationPlan abstraction so that we can create local indexes
+ // with a connectionless connection (which makes testing easier).
+ return new BaseMutationPlan(plan.getContext(), Operation.UPSERT) {
+
+ @Override
+ public MutationState execute() throws SQLException {
+ connection.getMutationState().commitDDLFence(dataTable);
+ Cell kv = plan.iterator().next().getValue(0);
+ ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+ // A single Cell will be returned with the count(*) - we decode that here
+ long rowCount = PLong.INSTANCE.getCodec().decodeLong(tmpPtr, SortOrder.getDefault());
+ // The contract is to return a MutationState that contains the number of rows modified. In this
+ // case, it's the number of rows in the data table which corresponds to the number of index
+ // rows that were added.
+ return new MutationState(0, connection, rowCount);
+ }
+
+ };
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index a9754b3..1e8210a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -77,7 +77,7 @@ public class TraceQueryPlan implements QueryPlan {
PColumn column =
new PColumnImpl(PNameFactory.newName(MetricInfo.TRACE.columnName), null,
PLong.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null,
- false, null, false);
+ false, null, false, false);
List<PColumn> columns = new ArrayList<PColumn>();
columns.add(column);
Expression expression =
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
index 3bc1e37..f8b2778 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
@@ -75,7 +75,7 @@ public class UnionCompiler {
String name = selectNodes == null ? colProj.getName() : selectNodes.get(i).getAlias();
PColumnImpl projectedColumn = new PColumnImpl(PNameFactory.newName(name), UNION_FAMILY_NAME,
sourceExpression.getDataType(), sourceExpression.getMaxLength(), sourceExpression.getScale(), sourceExpression.isNullable(),
- i, sourceExpression.getSortOrder(), 500, null, false, sourceExpression.toString(), false);
+ i, sourceExpression.getSortOrder(), 500, null, false, sourceExpression.toString(), false, false);
projectedColumns.add(projectedColumn);
}
Long scn = statement.getConnection().getSCN();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
index 3f73048..b5e9c9f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
@@ -25,8 +25,12 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
-public abstract class BaseRegionScanner implements RegionScanner {
+public abstract class BaseRegionScanner extends DelegateRegionScanner {
+ public BaseRegionScanner(RegionScanner delegate) {
+ super(delegate);
+ }
+
@Override
public boolean isFilterDone() {
return false;
@@ -46,11 +50,6 @@ public abstract class BaseRegionScanner implements RegionScanner {
}
@Override
- public long getMvccReadPoint() {
- return Long.MAX_VALUE;
- }
-
- @Override
public boolean nextRaw(List<Cell> result) throws IOException {
return next(result);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index d613688..3237882 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -315,15 +315,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
aggResults.add(keyValue);
}
// scanner using the non spillable, memory-only implementation
- return new BaseRegionScanner() {
+ return new BaseRegionScanner(s) {
private int index = 0;
@Override
- public HRegionInfo getRegionInfo() {
- return s.getRegionInfo();
- }
-
- @Override
public void close() throws IOException {
try {
s.close();
@@ -341,16 +336,6 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
index++;
return index < aggResults.size();
}
-
- @Override
- public long getMaxResultSize() {
- return s.getMaxResultSize();
- }
-
- @Override
- public int getBatch() {
- return s.getBatch();
- }
};
}
@@ -471,21 +456,11 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over ordered rows with scan " + scan + ", group by "
+ expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan)));
}
- return new BaseRegionScanner() {
+ return new BaseRegionScanner(scanner) {
private long rowCount = 0;
private ImmutableBytesWritable currentKey = null;
@Override
- public HRegionInfo getRegionInfo() {
- return scanner.getRegionInfo();
- }
-
- @Override
- public void close() throws IOException {
- scanner.close();
- }
-
- @Override
public boolean next(List<Cell> results) throws IOException {
boolean hasMore;
boolean atLimit;
@@ -567,15 +542,6 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
currentKey = null;
return false;
}
-
- @Override
- public long getMaxResultSize() {
- return scanner.getMaxResultSize();
- }
- @Override
- public int getBatch() {
- return scanner.getBatch();
- }
};
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 9887e7b..78f9700 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
@@ -109,9 +110,12 @@ import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Region.RowLock;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.cache.GlobalCache;
@@ -188,6 +192,7 @@ import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.UpgradeUtil;
+import org.hamcrest.core.IsInstanceOf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -620,7 +625,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
isRowTimestampKV.getValueArray(), isRowTimestampKV.getValueOffset(),
isRowTimestampKV.getValueLength()));
- PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp);
+ PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, false);
columns.add(column);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 7950ac8..a2f7282 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -22,6 +22,7 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import java.util.TimerTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -252,7 +253,10 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
}
if (conn == null) {
- conn = DriverManager.getConnection(getJdbcUrl(env)).unwrap(PhoenixConnection.class);
+ final Properties props = new Properties();
+ // don't run a second index populations upsert select
+ props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0");
+ conn = DriverManager.getConnection(getJdbcUrl(env), props).unwrap(PhoenixConnection.class);
}
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable);
@@ -270,7 +274,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB,
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME);
long timeStamp = Math.max(0, disabledTimeStampVal - overlapTime);
-
+
LOG.info("Starting to build index=" + indexPTable.getName() + " from timestamp=" + timeStamp);
client.buildPartialIndexFromTimeStamp(indexPTable, new TableRef(dataPTable, Long.MAX_VALUE, timeStamp));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 65a43de..d2bd3b3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -253,7 +253,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
} finally {
region.closeRegionOperation();
}
- return new BaseRegionScanner() {
+ return new BaseRegionScanner(s) {
private Tuple tuple = firstTuple;
@Override
@@ -262,11 +262,6 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
}
@Override
- public HRegionInfo getRegionInfo() {
- return s.getRegionInfo();
- }
-
- @Override
public boolean next(List<Cell> results) throws IOException {
try {
if (isFilterDone()) {
@@ -301,16 +296,6 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
}
}
}
-
- @Override
- public long getMaxResultSize() {
- return s.getMaxResultSize();
- }
-
- @Override
- public int getBatch() {
- return s.getBatch();
- }
};
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index f332e60..05cf08e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -554,25 +554,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
final KeyValue aggKeyValue = keyValue;
- RegionScanner scanner = new BaseRegionScanner() {
+ RegionScanner scanner = new BaseRegionScanner(innerScanner) {
private boolean done = !hadAny;
@Override
- public HRegionInfo getRegionInfo() {
- return innerScanner.getRegionInfo();
- }
-
- @Override
public boolean isFilterDone() {
return done;
}
@Override
- public void close() throws IOException {
- innerScanner.close();
- }
-
- @Override
public boolean next(List<Cell> results) throws IOException {
if (done) return false;
done = true;
@@ -584,11 +574,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
public long getMaxResultSize() {
return scan.getMaxResultSize();
}
-
- @Override
- public int getBatch() {
- return innerScanner.getBatch();
- }
};
return scanner;
}
@@ -690,7 +675,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
final KeyValue aggKeyValue =
KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
- RegionScanner scanner = new BaseRegionScanner() {
+ RegionScanner scanner = new BaseRegionScanner(innerScanner) {
@Override
public HRegionInfo getRegionInfo() {
return region.getRegionInfo();
@@ -716,11 +701,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
public long getMaxResultSize() {
return scan.getMaxResultSize();
}
-
- @Override
- public int getBatch() {
- return innerScanner.getBatch();
- }
};
return scanner;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
index be8d7e2..f74ed0b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
@@ -259,6 +259,16 @@ public final class PTableProtos {
* <code>optional bool isRowTimestamp = 13;</code>
*/
boolean getIsRowTimestamp();
+
+ // optional bool isDynamic = 14;
+ /**
+ * <code>optional bool isDynamic = 14;</code>
+ */
+ boolean hasIsDynamic();
+ /**
+ * <code>optional bool isDynamic = 14;</code>
+ */
+ boolean getIsDynamic();
}
/**
* Protobuf type {@code PColumn}
@@ -376,6 +386,11 @@ public final class PTableProtos {
isRowTimestamp_ = input.readBool();
break;
}
+ case 112: {
+ bitField0_ |= 0x00002000;
+ isDynamic_ = input.readBool();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -678,6 +693,22 @@ public final class PTableProtos {
return isRowTimestamp_;
}
+ // optional bool isDynamic = 14;
+ public static final int ISDYNAMIC_FIELD_NUMBER = 14;
+ private boolean isDynamic_;
+ /**
+ * <code>optional bool isDynamic = 14;</code>
+ */
+ public boolean hasIsDynamic() {
+ return ((bitField0_ & 0x00002000) == 0x00002000);
+ }
+ /**
+ * <code>optional bool isDynamic = 14;</code>
+ */
+ public boolean getIsDynamic() {
+ return isDynamic_;
+ }
+
private void initFields() {
columnNameBytes_ = com.google.protobuf.ByteString.EMPTY;
familyNameBytes_ = com.google.protobuf.ByteString.EMPTY;
@@ -692,6 +723,7 @@ public final class PTableProtos {
viewReferenced_ = false;
expression_ = "";
isRowTimestamp_ = false;
+ isDynamic_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -764,6 +796,9 @@ public final class PTableProtos {
if (((bitField0_ & 0x00001000) == 0x00001000)) {
output.writeBool(13, isRowTimestamp_);
}
+ if (((bitField0_ & 0x00002000) == 0x00002000)) {
+ output.writeBool(14, isDynamic_);
+ }
getUnknownFields().writeTo(output);
}
@@ -825,6 +860,10 @@ public final class PTableProtos {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(13, isRowTimestamp_);
}
+ if (((bitField0_ & 0x00002000) == 0x00002000)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(14, isDynamic_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -913,6 +952,11 @@ public final class PTableProtos {
result = result && (getIsRowTimestamp()
== other.getIsRowTimestamp());
}
+ result = result && (hasIsDynamic() == other.hasIsDynamic());
+ if (hasIsDynamic()) {
+ result = result && (getIsDynamic()
+ == other.getIsDynamic());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -978,6 +1022,10 @@ public final class PTableProtos {
hash = (37 * hash) + ISROWTIMESTAMP_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getIsRowTimestamp());
}
+ if (hasIsDynamic()) {
+ hash = (37 * hash) + ISDYNAMIC_FIELD_NUMBER;
+ hash = (53 * hash) + hashBoolean(getIsDynamic());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -1113,6 +1161,8 @@ public final class PTableProtos {
bitField0_ = (bitField0_ & ~0x00000800);
isRowTimestamp_ = false;
bitField0_ = (bitField0_ & ~0x00001000);
+ isDynamic_ = false;
+ bitField0_ = (bitField0_ & ~0x00002000);
return this;
}
@@ -1193,6 +1243,10 @@ public final class PTableProtos {
to_bitField0_ |= 0x00001000;
}
result.isRowTimestamp_ = isRowTimestamp_;
+ if (((from_bitField0_ & 0x00002000) == 0x00002000)) {
+ to_bitField0_ |= 0x00002000;
+ }
+ result.isDynamic_ = isDynamic_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -1252,6 +1306,9 @@ public final class PTableProtos {
if (other.hasIsRowTimestamp()) {
setIsRowTimestamp(other.getIsRowTimestamp());
}
+ if (other.hasIsDynamic()) {
+ setIsDynamic(other.getIsDynamic());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -1819,6 +1876,39 @@ public final class PTableProtos {
return this;
}
+ // optional bool isDynamic = 14;
+ private boolean isDynamic_ ;
+ /**
+ * <code>optional bool isDynamic = 14;</code>
+ */
+ public boolean hasIsDynamic() {
+ return ((bitField0_ & 0x00002000) == 0x00002000);
+ }
+ /**
+ * <code>optional bool isDynamic = 14;</code>
+ */
+ public boolean getIsDynamic() {
+ return isDynamic_;
+ }
+ /**
+ * <code>optional bool isDynamic = 14;</code>
+ */
+ public Builder setIsDynamic(boolean value) {
+ bitField0_ |= 0x00002000;
+ isDynamic_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bool isDynamic = 14;</code>
+ */
+ public Builder clearIsDynamic() {
+ bitField0_ = (bitField0_ & ~0x00002000);
+ isDynamic_ = false;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:PColumn)
}
@@ -6909,38 +6999,39 @@ public final class PTableProtos {
descriptor;
static {
java.lang.String[] descriptorData = {
- "\n\014PTable.proto\032\021PGuidePosts.proto\"\223\002\n\007PC" +
+ "\n\014PTable.proto\032\021PGuidePosts.proto\"\246\002\n\007PC" +
"olumn\022\027\n\017columnNameBytes\030\001 \002(\014\022\027\n\017family" +
"NameBytes\030\002 \001(\014\022\020\n\010dataType\030\003 \002(\t\022\021\n\tmax" +
"Length\030\004 \001(\005\022\r\n\005scale\030\005 \001(\005\022\020\n\010nullable\030" +
"\006 \002(\010\022\020\n\010position\030\007 \002(\005\022\021\n\tsortOrder\030\010 \002" +
"(\005\022\021\n\tarraySize\030\t \001(\005\022\024\n\014viewConstant\030\n " +
"\001(\014\022\026\n\016viewReferenced\030\013 \001(\010\022\022\n\nexpressio" +
- "n\030\014 \001(\t\022\026\n\016isRowTimestamp\030\r \001(\010\"\232\001\n\013PTab" +
- "leStats\022\013\n\003key\030\001 \002(\014\022\016\n\006values\030\002 \003(\014\022\033\n\023" +
- "guidePostsByteCount\030\003 \001(\003\022\025\n\rkeyBytesCou",
- "nt\030\004 \001(\003\022\027\n\017guidePostsCount\030\005 \001(\005\022!\n\013pGu" +
- "idePosts\030\006 \001(\0132\014.PGuidePosts\"\244\005\n\006PTable\022" +
- "\027\n\017schemaNameBytes\030\001 \002(\014\022\026\n\016tableNameByt" +
- "es\030\002 \002(\014\022\036\n\ttableType\030\003 \002(\0162\013.PTableType" +
- "\022\022\n\nindexState\030\004 \001(\t\022\026\n\016sequenceNumber\030\005" +
- " \002(\003\022\021\n\ttimeStamp\030\006 \002(\003\022\023\n\013pkNameBytes\030\007" +
- " \001(\014\022\021\n\tbucketNum\030\010 \002(\005\022\031\n\007columns\030\t \003(\013" +
- "2\010.PColumn\022\030\n\007indexes\030\n \003(\0132\007.PTable\022\027\n\017" +
- "isImmutableRows\030\013 \002(\010\022 \n\nguidePosts\030\014 \003(" +
- "\0132\014.PTableStats\022\032\n\022dataTableNameBytes\030\r ",
- "\001(\014\022\031\n\021defaultFamilyName\030\016 \001(\014\022\022\n\ndisabl" +
- "eWAL\030\017 \002(\010\022\023\n\013multiTenant\030\020 \002(\010\022\020\n\010viewT" +
- "ype\030\021 \001(\014\022\025\n\rviewStatement\030\022 \001(\014\022\025\n\rphys" +
- "icalNames\030\023 \003(\014\022\020\n\010tenantId\030\024 \001(\014\022\023\n\013vie" +
- "wIndexId\030\025 \001(\005\022\021\n\tindexType\030\026 \001(\014\022\026\n\016sta" +
- "tsTimeStamp\030\027 \001(\003\022\022\n\nstoreNulls\030\030 \001(\010\022\027\n" +
- "\017baseColumnCount\030\031 \001(\005\022\036\n\026rowKeyOrderOpt" +
- "imizable\030\032 \001(\010\022\025\n\rtransactional\030\033 \001(\010\022\034\n" +
- "\024updateCacheFrequency\030\034 \001(\003*A\n\nPTableTyp" +
- "e\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VIEW\020\002\022\t\n\005IND",
- "EX\020\003\022\010\n\004JOIN\020\004B@\n(org.apache.phoenix.cop" +
- "rocessor.generatedB\014PTableProtosH\001\210\001\001\240\001\001"
+ "n\030\014 \001(\t\022\026\n\016isRowTimestamp\030\r \001(\010\022\021\n\tisDyn" +
+ "amic\030\016 \001(\010\"\232\001\n\013PTableStats\022\013\n\003key\030\001 \002(\014\022" +
+ "\016\n\006values\030\002 \003(\014\022\033\n\023guidePostsByteCount\030\003",
+ " \001(\003\022\025\n\rkeyBytesCount\030\004 \001(\003\022\027\n\017guidePost" +
+ "sCount\030\005 \001(\005\022!\n\013pGuidePosts\030\006 \001(\0132\014.PGui" +
+ "dePosts\"\244\005\n\006PTable\022\027\n\017schemaNameBytes\030\001 " +
+ "\002(\014\022\026\n\016tableNameBytes\030\002 \002(\014\022\036\n\ttableType" +
+ "\030\003 \002(\0162\013.PTableType\022\022\n\nindexState\030\004 \001(\t\022" +
+ "\026\n\016sequenceNumber\030\005 \002(\003\022\021\n\ttimeStamp\030\006 \002" +
+ "(\003\022\023\n\013pkNameBytes\030\007 \001(\014\022\021\n\tbucketNum\030\010 \002" +
+ "(\005\022\031\n\007columns\030\t \003(\0132\010.PColumn\022\030\n\007indexes" +
+ "\030\n \003(\0132\007.PTable\022\027\n\017isImmutableRows\030\013 \002(\010" +
+ "\022 \n\nguidePosts\030\014 \003(\0132\014.PTableStats\022\032\n\022da",
+ "taTableNameBytes\030\r \001(\014\022\031\n\021defaultFamilyN" +
+ "ame\030\016 \001(\014\022\022\n\ndisableWAL\030\017 \002(\010\022\023\n\013multiTe" +
+ "nant\030\020 \002(\010\022\020\n\010viewType\030\021 \001(\014\022\025\n\rviewStat" +
+ "ement\030\022 \001(\014\022\025\n\rphysicalNames\030\023 \003(\014\022\020\n\010te" +
+ "nantId\030\024 \001(\014\022\023\n\013viewIndexId\030\025 \001(\005\022\021\n\tind" +
+ "exType\030\026 \001(\014\022\026\n\016statsTimeStamp\030\027 \001(\003\022\022\n\n" +
+ "storeNulls\030\030 \001(\010\022\027\n\017baseColumnCount\030\031 \001(" +
+ "\005\022\036\n\026rowKeyOrderOptimizable\030\032 \001(\010\022\025\n\rtra" +
+ "nsactional\030\033 \001(\010\022\034\n\024updateCacheFrequency" +
+ "\030\034 \001(\003*A\n\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER",
+ "\020\001\022\010\n\004VIEW\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org" +
+ ".apache.phoenix.coprocessor.generatedB\014P" +
+ "TableProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -6952,7 +7043,7 @@ public final class PTableProtos {
internal_static_PColumn_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_PColumn_descriptor,
- new java.lang.String[] { "ColumnNameBytes", "FamilyNameBytes", "DataType", "MaxLength", "Scale", "Nullable", "Position", "SortOrder", "ArraySize", "ViewConstant", "ViewReferenced", "Expression", "IsRowTimestamp", });
+ new java.lang.String[] { "ColumnNameBytes", "FamilyNameBytes", "DataType", "MaxLength", "Scale", "Nullable", "Position", "SortOrder", "ArraySize", "ViewConstant", "ViewReferenced", "Expression", "IsRowTimestamp", "IsDynamic", });
internal_static_PTableStats_descriptor =
getDescriptor().getMessageTypes().get(1);
internal_static_PTableStats_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 0a5b053..46aa819 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -54,6 +54,7 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.IndexMetaDataCacheClient;
import org.apache.phoenix.index.PhoenixIndexCodec;
@@ -569,7 +570,7 @@ public class MutationState implements SQLCloseable {
List<Mutation> indexMutations;
try {
indexMutations =
- IndexUtil.generateIndexData(table, index, mutationsPertainingToIndex,
+ IndexUtil.generateIndexData(table, index, mutationsPertainingToIndex,
connection.getKeyValueBuilder(), connection);
// we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map
if (!sendAll) {
@@ -719,37 +720,35 @@ public class MutationState implements SQLCloseable {
long serverTimeStamp = tableRef.getTimeStamp();
// If we're auto committing, we've already validated the schema when we got the ColumnResolver,
// so no need to do it again here.
- if (!connection.getAutoCommit()) {
- PTable table = tableRef.getTable();
- MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString());
- PTable resolvedTable = result.getTable();
- if (resolvedTable == null) {
- throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString());
- }
- // Always update tableRef table as the one we've cached may be out of date since when we executed
- // the UPSERT VALUES call and updated in the cache before this.
- tableRef.setTable(resolvedTable);
- long timestamp = result.getMutationTime();
- if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
- serverTimeStamp = timestamp;
- if (result.wasUpdated()) {
- // TODO: use bitset?
- PColumn[] columns = new PColumn[resolvedTable.getColumns().size()];
- for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) {
- RowMutationState valueEntry = rowEntry.getValue();
- if (valueEntry != null) {
- Map<PColumn, byte[]> colValues = valueEntry.getColumnValues();
- if (colValues != PRow.DELETE_MARKER) {
- for (PColumn column : colValues.keySet()) {
- columns[column.getPosition()] = column;
- }
+ PTable table = tableRef.getTable();
+ MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString());
+ PTable resolvedTable = result.getTable();
+ if (resolvedTable == null) {
+ throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString());
+ }
+ // Always update tableRef table as the one we've cached may be out of date since when we executed
+ // the UPSERT VALUES call and updated in the cache before this.
+ tableRef.setTable(resolvedTable);
+ long timestamp = result.getMutationTime();
+ if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
+ serverTimeStamp = timestamp;
+ if (result.wasUpdated()) {
+ List<PColumn> columns = Lists.newArrayListWithExpectedSize(table.getColumns().size());
+ for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) {
+ RowMutationState valueEntry = rowEntry.getValue();
+ if (valueEntry != null) {
+ Map<PColumn, byte[]> colValues = valueEntry.getColumnValues();
+ if (colValues != PRow.DELETE_MARKER) {
+ for (PColumn column : colValues.keySet()) {
+ if (!column.isDynamic())
+ columns.add(column);
}
}
}
- for (PColumn column : columns) {
- if (column != null) {
- resolvedTable.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
- }
+ }
+ for (PColumn column : columns) {
+ if (column != null) {
+ resolvedTable.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index d40a15b..b0e7b6e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -205,6 +205,8 @@ public interface QueryServices extends SQLCloseable {
public static final String HCONNECTION_POOL_MAX_SIZE = "hbase.hconnection.threads.max";
public static final String HTABLE_MAX_THREADS = "hbase.htable.threads.max";
+ // time to wait before running second index population upsert select (so that any pending batches of rows on region server are also written to index)
+ public static final String INDEX_POPULATION_SLEEP_TIME = "phoenix.index.population.wait.time";
/**
* Get executor service used for parallel scans
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 9257413..c9bc19b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -37,6 +37,7 @@ import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB;
import static org.apache.phoenix.query.QueryServices.IMMUTABLE_ROWS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.INDEX_POPULATION_SLEEP_TIME;
import static org.apache.phoenix.query.QueryServices.KEEP_ALIVE_MS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MASTER_INFO_PORT_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB;
@@ -223,6 +224,8 @@ public class QueryServicesOptions {
public static final boolean DEFAULT_RETURN_SEQUENCE_VALUES = false;
public static final String DEFAULT_EXTRA_JDBC_ARGUMENTS = "";
+
+ public static final long DEFAULT_INDEX_POPULATION_SLEEP_TIME = 5000;
// QueryServer defaults -- ensure ThinClientUtil is also updated since phoenix-server-client
// doesn't depend on phoenix-core.
@@ -430,7 +433,6 @@ public class QueryServicesOptions {
return set(GROUPBY_SPILL_FILES_ATTRIB, num);
}
-
private QueryServicesOptions set(String name, boolean value) {
config.set(name, Boolean.toString(value));
return this;
@@ -641,4 +643,9 @@ public class QueryServicesOptions {
return this;
}
+ public QueryServicesOptions setDefaultIndexPopulationWaitTime(long waitTime) {
+ config.setLong(INDEX_POPULATION_SLEEP_TIME, waitTime);
+ return this;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
index ddb0a1a..798706e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
@@ -84,4 +84,9 @@ public class DelegateColumn extends DelegateDatum implements PColumn {
public String toString() {
return getDelegate().toString();
}
+
+ @Override
+ public boolean isDynamic() {
+ return getDelegate().isDynamic();
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index d559842..e8d995c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -114,9 +114,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import co.cask.tephra.TxConstants;
-
-import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -125,17 +122,15 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.compile.BaseMutationPlan;
import org.apache.phoenix.compile.ColumnResolver;
import org.apache.phoenix.compile.FromCompiler;
import org.apache.phoenix.compile.IndexExpressionCompiler;
import org.apache.phoenix.compile.MutationPlan;
import org.apache.phoenix.compile.PostDDLCompiler;
import org.apache.phoenix.compile.PostIndexDDLCompiler;
-import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.PostLocalIndexDDLCompiler;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.compile.StatementNormalizer;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -153,7 +148,6 @@ import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
import org.apache.phoenix.parse.AddColumnStatement;
import org.apache.phoenix.parse.AlterIndexStatement;
import org.apache.phoenix.parse.ColumnDef;
@@ -210,6 +204,8 @@ import org.apache.phoenix.util.UpgradeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import co.cask.tephra.TxConstants;
+
import com.google.common.base.Objects;
import com.google.common.collect.Iterators;
import com.google.common.collect.ListMultimap;
@@ -821,7 +817,7 @@ public class MetaDataClient {
}
PColumn column = new PColumnImpl(PNameFactory.newName(columnName), familyName, def.getDataType(),
- def.getMaxLength(), def.getScale(), isNull, position, sortOrder, def.getArraySize(), null, false, def.getExpression(), isRowTimestamp);
+ def.getMaxLength(), def.getScale(), isNull, position, sortOrder, def.getArraySize(), null, false, def.getExpression(), isRowTimestamp, false);
return column;
} catch (IllegalArgumentException e) { // Based on precondition check in constructor
throw new SQLException(e);
@@ -1021,89 +1017,68 @@ public class MetaDataClient {
try {
connection.setAutoCommit(true);
MutationPlan mutationPlan;
-
- // For local indexes, we optimize the initial index population by *not* sending Puts over
- // the wire for the index rows, as we don't need to do that. Instead, we tap into our
- // region observer to generate the index rows based on the data rows as we scan
if (index.getIndexType() == IndexType.LOCAL) {
- try (final PhoenixStatement statement = new PhoenixStatement(connection)) {
- String tableName = getFullTableName(dataTableRef);
- String query = "SELECT count(*) FROM " + tableName;
- final QueryPlan plan = statement.compileQuery(query);
- TableRef tableRef = plan.getTableRef();
- // Set attribute on scan that UngroupedAggregateRegionObserver will switch on.
- // We'll detect that this attribute was set the server-side and write the index
- // rows per region as a result. The value of the attribute will be our persisted
- // index maintainers.
- // Define the LOCAL_INDEX_BUILD as a new static in BaseScannerRegionObserver
- Scan scan = plan.getContext().getScan();
- try {
- if(ScanUtil.isDefaultTimeRange(scan.getTimeRange())) {
- Long scn = connection.getSCN();
- if (scn == null) {
- scn = plan.getContext().getCurrentTime();
- }
- scan.setTimeRange(dataTableRef.getLowerBoundTimeStamp(),scn);
- }
- } catch (IOException e) {
- throw new SQLException(e);
- }
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- final PTable dataTable = tableRef.getTable();
- for(PTable idx: dataTable.getIndexes()) {
- if(idx.getName().equals(index.getName())) {
- index = idx;
- break;
- }
- }
- List<PTable> indexes = Lists.newArrayListWithExpectedSize(1);
- // Only build newly created index.
- indexes.add(index);
- IndexMaintainer.serialize(dataTable, ptr, indexes, plan.getContext().getConnection());
- scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr));
- // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
- // However, in this case, we need to project all of the data columns that contribute to the index.
- IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection);
- for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
- scan.addColumn(columnRef.getFamily(), columnRef.getQualifier());
- }
-
- // Go through MutationPlan abstraction so that we can create local indexes
- // with a connectionless connection (which makes testing easier).
- mutationPlan = new BaseMutationPlan(plan.getContext(), Operation.UPSERT) {
-
- @Override
- public MutationState execute() throws SQLException {
- connection.getMutationState().commitDDLFence(dataTable);
- Cell kv = plan.iterator().next().getValue(0);
- ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
- // A single Cell will be returned with the count(*) - we decode that here
- long rowCount = PLong.INSTANCE.getCodec().decodeLong(tmpPtr, SortOrder.getDefault());
- // The contract is to return a MutationState that contains the number of rows modified. In this
- // case, it's the number of rows in the data table which corresponds to the number of index
- // rows that were added.
- return new MutationState(0, connection, rowCount);
- }
-
- };
- }
+ PostLocalIndexDDLCompiler compiler =
+ new PostLocalIndexDDLCompiler(connection, getFullTableName(dataTableRef));
+ mutationPlan = compiler.compile(index);
} else {
PostIndexDDLCompiler compiler = new PostIndexDDLCompiler(connection, dataTableRef);
mutationPlan = compiler.compile(index);
- try {
- Long scn = connection.getSCN();
+ }
+ Scan scan = mutationPlan.getContext().getScan();
+ Long scn = connection.getSCN();
+ try {
+ if (ScanUtil.isDefaultTimeRange(scan.getTimeRange())) {
if (scn == null) {
scn = mutationPlan.getContext().getCurrentTime();
}
- mutationPlan.getContext().getScan().setTimeRange(dataTableRef.getLowerBoundTimeStamp(), scn);
+ scan.setTimeRange(dataTableRef.getLowerBoundTimeStamp(), scn);
+ }
+ } catch (IOException e) {
+ throw new SQLException(e);
+ }
+
+ // execute index population upsert select
+ long startTime = System.currentTimeMillis();
+ MutationState state = connection.getQueryServices().updateData(mutationPlan);
+ long firstUpsertSelectTime = System.currentTimeMillis() - startTime;
+
+ // for global indexes on non transactional tables we might have to
+ // run a second index population upsert select to handle data rows
+ // that were being written on the server while the index was created
+ long sleepTime =
+ connection
+ .getQueryServices()
+ .getProps()
+ .getLong(QueryServices.INDEX_POPULATION_SLEEP_TIME,
+ QueryServicesOptions.DEFAULT_INDEX_POPULATION_SLEEP_TIME);
+ if (!dataTableRef.getTable().isTransactional() && sleepTime > 0) {
+ long delta = sleepTime - firstUpsertSelectTime;
+ if (delta > 0) {
+ try {
+ Thread.sleep(delta);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION)
+ .setRootCause(e).build().buildException();
+ }
+ }
+ // set the min timestamp of second index upsert select some time before the index
+ // was created
+ long minTimestamp = index.getTimeStamp() - firstUpsertSelectTime;
+ try {
+ mutationPlan.getContext().getScan().setTimeRange(minTimestamp, scn);
} catch (IOException e) {
throw new SQLException(e);
}
+ MutationState newMutationState =
+ connection.getQueryServices().updateData(mutationPlan);
+ state.join(newMutationState);
}
- MutationState state = connection.getQueryServices().updateData(mutationPlan);
+
indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
- TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
- dataTableRef.getTable().getTableName().getString(), false, PIndexState.ACTIVE);
+ TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
+ dataTableRef.getTable().getTableName().getString(), false, PIndexState.ACTIVE);
alterIndex(indexStatement);
return state;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
index 357ce6f..0f5fa44 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
@@ -58,4 +58,6 @@ public interface PColumn extends PDatum {
* @return whether this column represents/stores the hbase cell timestamp.
*/
boolean isRowTimestamp();
+
+ boolean isDynamic();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
index cff276b..a556f76 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
@@ -39,6 +39,7 @@ public class PColumnImpl implements PColumn {
private boolean isViewReferenced;
private String expressionStr;
private boolean isRowTimestamp;
+ private boolean isDynamic;
public PColumnImpl() {
}
@@ -50,13 +51,13 @@ public class PColumnImpl implements PColumn {
Integer scale,
boolean nullable,
int position,
- SortOrder sortOrder, Integer arrSize, byte[] viewConstant, boolean isViewReferenced, String expressionStr, boolean isRowTimestamp) {
- init(name, familyName, dataType, maxLength, scale, nullable, position, sortOrder, arrSize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp);
+ SortOrder sortOrder, Integer arrSize, byte[] viewConstant, boolean isViewReferenced, String expressionStr, boolean isRowTimestamp, boolean isDynamic) {
+ init(name, familyName, dataType, maxLength, scale, nullable, position, sortOrder, arrSize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, isDynamic);
}
public PColumnImpl(PColumn column, int position) {
this(column.getName(), column.getFamilyName(), column.getDataType(), column.getMaxLength(),
- column.getScale(), column.isNullable(), position, column.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp());
+ column.getScale(), column.isNullable(), position, column.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(), column.isDynamic());
}
private void init(PName name,
@@ -68,7 +69,7 @@ public class PColumnImpl implements PColumn {
int position,
SortOrder sortOrder,
Integer arrSize,
- byte[] viewConstant, boolean isViewReferenced, String expressionStr, boolean isRowTimestamp) {
+ byte[] viewConstant, boolean isViewReferenced, String expressionStr, boolean isRowTimestamp, boolean isDynamic) {
Preconditions.checkNotNull(sortOrder);
this.dataType = dataType;
if (familyName == null) {
@@ -92,6 +93,7 @@ public class PColumnImpl implements PColumn {
this.isViewReferenced = isViewReferenced;
this.expressionStr = expressionStr;
this.isRowTimestamp = isRowTimestamp;
+ this.isDynamic = isDynamic;
}
@Override
@@ -198,6 +200,11 @@ public class PColumnImpl implements PColumn {
public boolean isRowTimestamp() {
return isRowTimestamp;
}
+
+ @Override
+ public boolean isDynamic() {
+ return isDynamic;
+ }
/**
* Create a PColumn instance from PBed PColumn instance
@@ -240,8 +247,12 @@ public class PColumnImpl implements PColumn {
expressionStr = column.getExpression();
}
boolean isRowTimestamp = column.getIsRowTimestamp();
+ boolean isDynamic = false;
+ if (column.hasIsDynamic()) {
+ isDynamic = column.getIsDynamic();
+ }
return new PColumnImpl(columnName, familyName, dataType, maxLength, scale, nullable, position, sortOrder,
- arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp);
+ arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, isDynamic);
}
public static PTableProtos.PColumn toProto(PColumn column) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
index 66b4af3..413d116 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
@@ -414,7 +414,7 @@ public class PMetaDataImpl implements PMetaData {
// Update position of columns that follow removed column
for (int i = position+1; i < oldColumns.size(); i++) {
PColumn oldColumn = oldColumns.get(i);
- PColumn newColumn = new PColumnImpl(oldColumn.getName(), oldColumn.getFamilyName(), oldColumn.getDataType(), oldColumn.getMaxLength(), oldColumn.getScale(), oldColumn.isNullable(), i-1+positionOffset, oldColumn.getSortOrder(), oldColumn.getArraySize(), oldColumn.getViewConstant(), oldColumn.isViewReferenced(), null, oldColumn.isRowTimestamp());
+ PColumn newColumn = new PColumnImpl(oldColumn.getName(), oldColumn.getFamilyName(), oldColumn.getDataType(), oldColumn.getMaxLength(), oldColumn.getScale(), oldColumn.isNullable(), i-1+positionOffset, oldColumn.getSortOrder(), oldColumn.getArraySize(), oldColumn.getViewConstant(), oldColumn.isViewReferenced(), null, oldColumn.isRowTimestamp(), oldColumn.isDynamic());
columns.add(newColumn);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
index 4ac54cb..734a9ed 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
@@ -38,7 +38,7 @@ public class SaltingUtil {
public static final String SALTING_COLUMN_NAME = "_SALT";
public static final String SALTED_ROW_KEY_NAME = "_SALTED_KEY";
public static final PColumnImpl SALTING_COLUMN = new PColumnImpl(
- PNameFactory.newName(SALTING_COLUMN_NAME), null, PBinary.INSTANCE, 1, 0, false, 0, SortOrder.getDefault(), 0, null, false, null, false);
+ PNameFactory.newName(SALTING_COLUMN_NAME), null, PBinary.INSTANCE, 1, 0, false, 0, SortOrder.getDefault(), 0, null, false, null, false, false);
public static final RowKeySchema VAR_BINARY_SALTED_SCHEMA = new RowKeySchemaBuilder(2)
.addField(SALTING_COLUMN, false, SortOrder.getDefault())
.addField(SchemaUtil.VAR_BINARY_DATUM, false, SortOrder.getDefault()).build();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
index 72f3e01..6b89187 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
@@ -226,7 +226,7 @@ public class CorrelatePlanTest {
Expression expr = LiteralExpression.newConstant(row[i]);
columns.add(new PColumnImpl(PNameFactory.newName(name), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY),
expr.getDataType(), expr.getMaxLength(), expr.getScale(), expr.isNullable(),
- i, expr.getSortOrder(), null, null, false, name, false));
+ i, expr.getSortOrder(), null, null, false, name, false, false));
}
try {
PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
index d508707..8b2b096 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
@@ -116,8 +116,8 @@ public class UnnestArrayPlanTest {
LiteralExpression dummy = LiteralExpression.newConstant(null, arrayType);
RowKeyValueAccessor accessor = new RowKeyValueAccessor(Arrays.asList(dummy), 0);
UnnestArrayPlan plan = new UnnestArrayPlan(subPlan, new RowKeyColumnExpression(dummy, accessor), withOrdinality);
- PColumn elemColumn = new PColumnImpl(PNameFactory.newName("ELEM"), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), baseType, null, null, true, 0, SortOrder.getDefault(), null, null, false, "", false);
- PColumn indexColumn = withOrdinality ? new PColumnImpl(PNameFactory.newName("IDX"), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), PInteger.INSTANCE, null, null, true, 0, SortOrder.getDefault(), null, null, false, "", false) : null;
+ PColumn elemColumn = new PColumnImpl(PNameFactory.newName("ELEM"), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), baseType, null, null, true, 0, SortOrder.getDefault(), null, null, false, "", false, false);
+ PColumn indexColumn = withOrdinality ? new PColumnImpl(PNameFactory.newName("IDX"), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), PInteger.INSTANCE, null, null, true, 0, SortOrder.getDefault(), null, null, false, "", false, false) : null;
List<PColumn> columns = withOrdinality ? Arrays.asList(elemColumn, indexColumn) : Arrays.asList(elemColumn);
ProjectedColumnExpression elemExpr = new ProjectedColumnExpression(elemColumn, columns, 0, elemColumn.getName().getString());
ProjectedColumnExpression indexExpr = withOrdinality ? new ProjectedColumnExpression(indexColumn, columns, 1, indexColumn.getName().getString()) : null;