You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2017/03/06 09:28:09 UTC
phoenix git commit: Higher memory consumption on RS leading to
OOM/abort on immutable index creation with multiple regions on single RS
Repository: phoenix
Updated Branches:
refs/heads/master c8612fa1b -> 8f6d02f79
Higher memory consumption on RS leading to OOM/abort on immutable index creation with multiple regions on single RS
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8f6d02f7
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8f6d02f7
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8f6d02f7
Branch: refs/heads/master
Commit: 8f6d02f79871eb8a2458ca7aecfb10b3ebf34e7b
Parents: c8612fa
Author: Ankit Singhal <an...@gmail.com>
Authored: Mon Mar 6 14:58:01 2017 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Mon Mar 6 14:58:01 2017 +0530
----------------------------------------------------------------------
.../apache/phoenix/compile/UpsertCompiler.java | 20 ++++-
.../UngroupedAggregateRegionObserver.java | 86 +++++++++++++-------
.../apache/phoenix/schema/MetaDataClient.java | 60 ++++++++++++--
3 files changed, 128 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f6d02f7/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 7a285a9..260e591 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -24,6 +24,7 @@ import java.sql.ParameterMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
@@ -106,6 +107,7 @@ import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -756,6 +758,10 @@ public class UpsertCompiler {
Tuple row = iterator.next();
final long mutationCount = (Long)aggProjector.getColumnProjector(0).getValue(row,
PLong.INSTANCE, ptr);
+ for (PTable index : getNewIndexes(table)) {
+ new MetaDataClient(connection).buildIndex(index, tableRef,
+ scan.getTimeRange().getMax(), scan.getTimeRange().getMax() + 1);
+ }
return new MutationState(maxSize, connection) {
@Override
public long getUpdateCount() {
@@ -767,7 +773,19 @@ public class UpsertCompiler {
}
}
-
+
+ private List<PTable> getNewIndexes(PTable table) throws SQLException {
+ List<PTable> indexes = table.getIndexes();
+ List<PTable> newIndexes = new ArrayList<PTable>(2);
+ PTable newTable = PhoenixRuntime.getTableNoCache(connection, table.getName().getString());
+ for (PTable index : newTable.getIndexes()) {
+ if (!indexes.contains(index)) {
+ newIndexes.add(index);
+ }
+ }
+ return newIndexes;
+ }
+
@Override
public ExplainPlan getExplainPlan() throws SQLException {
List<String> queryPlanSteps = aggPlan.getExplainPlan().getPlanSteps();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f6d02f7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index c5854d3..2dec235 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -35,7 +35,6 @@ import java.security.PrivilegedExceptionAction;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -59,6 +58,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.Region;
@@ -72,7 +72,6 @@ import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.coprocessor.generated.PTableProtos;
import org.apache.phoenix.exception.DataExceedsCapacityException;
-import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ExpressionType;
@@ -127,6 +126,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
/**
@@ -288,6 +288,41 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
return s;
}
+ class MutationList extends ArrayList<Mutation> implements HeapSize {
+ private long heapSize = 0l;
+ public MutationList() {
+ super();
+ }
+
+ public MutationList(int size){
+ super(size);
+ }
+
+ @Override
+ public boolean add(Mutation e) {
+ boolean r = super.add(e);
+ if (r) {
+ incrementHeapSize(e.heapSize());
+ }
+ return r;
+ }
+
+ @Override
+ public long heapSize() {
+ return heapSize;
+ }
+
+ private void incrementHeapSize(long heapSize) {
+ this.heapSize += heapSize;
+ }
+
+ @Override
+ public void clear() {
+ heapSize = 0l;
+ super.clear();
+ }
+ }
+
@Override
protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException, SQLException {
RegionCoprocessorEnvironment env = c.getEnvironment();
@@ -339,7 +374,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
}
List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto);
- List<Mutation> indexMutations = localIndexBytes == null ? Collections.<Mutation>emptyList() : Lists.<Mutation>newArrayListWithExpectedSize(1024);
+ MutationList indexMutations = localIndexBytes == null ? new MutationList() : new MutationList(1024);
RegionScanner theScanner = s;
@@ -395,9 +430,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), env, useQualifierAsIndex, useNewValueColumnQualifier);
}
- int batchSize = 0;
- long batchSizeBytes = 0L;
- List<Mutation> mutations = Collections.emptyList();
+ int maxBatchSize = 0;
+ long maxBatchSizeBytes = 0L;
+ MutationList mutations = new MutationList();
boolean needToWrite = false;
Configuration conf = c.getEnvironment().getConfiguration();
long flushSize = region.getTableDesc().getMemStoreFlushSize();
@@ -420,10 +455,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan;
if (isDescRowKeyOrderUpgrade || isDelete || isUpsert || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) {
needToWrite = true;
- // TODO: size better
- mutations = Lists.newArrayListWithExpectedSize(1024);
- batchSize = env.getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
- batchSizeBytes = env.getConfiguration().getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+ maxBatchSize = env.getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+ mutations = new MutationList(Ints.saturatedCast(maxBatchSize + maxBatchSize / 10));
+ maxBatchSizeBytes = env.getConfiguration().getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
}
Aggregators aggregators = ServerAggregators.deserialize(
@@ -666,22 +700,17 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
mutations.add(put);
}
}
- // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
- List<List<Mutation>> batchMutationList =
- MutationState.getMutationBatchList(batchSize, batchSizeBytes, mutations);
- for (List<Mutation> batchMutations : batchMutationList) {
- commit(region, batchMutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr,
- txState, areMutationInSameRegion, targetHTable, useIndexProto);
- batchMutations.clear();
- }
+ }
+ if (readyToCommit(mutations, maxBatchSize, maxBatchSizeBytes)) {
+ commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState,
+ areMutationInSameRegion, targetHTable, useIndexProto);
mutations.clear();
- // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
- List<List<Mutation>> batchIndexMutationList =
- MutationState.getMutationBatchList(batchSize, batchSizeBytes, indexMutations);
- for (List<Mutation> batchIndexMutations : batchIndexMutationList) {
- commitBatch(region, batchIndexMutations, null, blockingMemStoreSize, null, txState, useIndexProto);
- batchIndexMutations.clear();
- }
+ }
+ // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
+
+ if (readyToCommit(indexMutations, maxBatchSize, maxBatchSizeBytes)) {
+ commitBatch(region, indexMutations, null, blockingMemStoreSize, null, txState,
+ useIndexProto);
indexMutations.clear();
}
aggregators.aggregate(rowAggregators, result);
@@ -774,10 +803,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
return false;
}
- private boolean readyToCommit(List<Mutation> mutations,int batchSize){
- return !mutations.isEmpty() && batchSize > 0 &&
- mutations.size() > batchSize;
+ private boolean readyToCommit(MutationList mutations, int maxBatchSize, long maxBatchSizeBytes) {
+ return !mutations.isEmpty() && (maxBatchSize > 0 && mutations.size() > maxBatchSize)
+ || (maxBatchSizeBytes > 0 && mutations.heapSize() > maxBatchSizeBytes);
}
+
@Override
public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
final InternalScanner scanner, final ScanType scanType) throws IOException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f6d02f7/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 262047c..f2820f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1226,21 +1226,63 @@ public class MetaDataClient {
}
throw new IllegalStateException(); // impossible
}
+
+ /**
+ * For new mutations only should not be used if there are deletes done in the data table between start time and end
+ * time passed to the method.
+ */
+ public MutationState buildIndex(PTable index, TableRef dataTableRef, long startTime, long EndTime)
+ throws SQLException {
+ boolean wasAutoCommit = connection.getAutoCommit();
+ try {
+ AlterIndexStatement indexStatement = FACTORY
+ .alterIndex(
+ FACTORY.namedTable(null,
+ TableName.create(index.getSchemaName().getString(),
+ index.getTableName().getString())),
+ dataTableRef.getTable().getTableName().getString(), false, PIndexState.INACTIVE);
+ alterIndex(indexStatement);
+ connection.setAutoCommit(true);
+ MutationPlan mutationPlan = getMutationPlanForBuildingIndex(index, dataTableRef);
+ Scan scan = mutationPlan.getContext().getScan();
+ try {
+ scan.setTimeRange(startTime, EndTime);
+ } catch (IOException e) {
+ throw new SQLException(e);
+ }
+ MutationState state = connection.getQueryServices().updateData(mutationPlan);
+ indexStatement = FACTORY
+ .alterIndex(
+ FACTORY.namedTable(null,
+ TableName.create(index.getSchemaName().getString(),
+ index.getTableName().getString())),
+ dataTableRef.getTable().getTableName().getString(), false, PIndexState.ACTIVE);
+ alterIndex(indexStatement);
+ return state;
+ } finally {
+ connection.setAutoCommit(wasAutoCommit);
+ }
+ }
+
+ private MutationPlan getMutationPlanForBuildingIndex(PTable index, TableRef dataTableRef) throws SQLException {
+ MutationPlan mutationPlan;
+ if (index.getIndexType() == IndexType.LOCAL) {
+ PostLocalIndexDDLCompiler compiler =
+ new PostLocalIndexDDLCompiler(connection, getFullTableName(dataTableRef));
+ mutationPlan = compiler.compile(index);
+ } else {
+ PostIndexDDLCompiler compiler = new PostIndexDDLCompiler(connection, dataTableRef);
+ mutationPlan = compiler.compile(index);
+ }
+ return mutationPlan;
+ }
private MutationState buildIndex(PTable index, TableRef dataTableRef) throws SQLException {
AlterIndexStatement indexStatement = null;
boolean wasAutoCommit = connection.getAutoCommit();
try {
connection.setAutoCommit(true);
- MutationPlan mutationPlan;
- if (index.getIndexType() == IndexType.LOCAL) {
- PostLocalIndexDDLCompiler compiler =
- new PostLocalIndexDDLCompiler(connection, getFullTableName(dataTableRef));
- mutationPlan = compiler.compile(index);
- } else {
- PostIndexDDLCompiler compiler = new PostIndexDDLCompiler(connection, dataTableRef);
- mutationPlan = compiler.compile(index);
- }
+ MutationPlan mutationPlan = getMutationPlanForBuildingIndex(index, dataTableRef);
Scan scan = mutationPlan.getContext().getScan();
Long scn = connection.getSCN();
try {