You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2017/03/16 21:11:57 UTC

[35/50] [abbrv] phoenix git commit: Higher memory consumption on RS leading to OOM/abort on immutable index creation with multiple regions on single RS

Higher memory consumption on RS leading to OOM/abort on immutable index creation with multiple regions on single RS


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8f6d02f7
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8f6d02f7
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8f6d02f7

Branch: refs/heads/calcite
Commit: 8f6d02f79871eb8a2458ca7aecfb10b3ebf34e7b
Parents: c8612fa
Author: Ankit Singhal <an...@gmail.com>
Authored: Mon Mar 6 14:58:01 2017 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Mon Mar 6 14:58:01 2017 +0530

----------------------------------------------------------------------
 .../apache/phoenix/compile/UpsertCompiler.java  | 20 ++++-
 .../UngroupedAggregateRegionObserver.java       | 86 +++++++++++++-------
 .../apache/phoenix/schema/MetaDataClient.java   | 60 ++++++++++++--
 3 files changed, 128 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f6d02f7/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 7a285a9..260e591 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -24,6 +24,7 @@ import java.sql.ParameterMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
@@ -106,6 +107,7 @@ import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
@@ -756,6 +758,10 @@ public class UpsertCompiler {
                                 Tuple row = iterator.next();
                                 final long mutationCount = (Long)aggProjector.getColumnProjector(0).getValue(row,
                                         PLong.INSTANCE, ptr);
+                                for (PTable index : getNewIndexes(table)) {
+                                    new MetaDataClient(connection).buildIndex(index, tableRef,
+                                            scan.getTimeRange().getMax(), scan.getTimeRange().getMax() + 1);
+                                }
                                 return new MutationState(maxSize, connection) {
                                     @Override
                                     public long getUpdateCount() {
@@ -767,7 +773,19 @@ public class UpsertCompiler {
                             }
                             
                         }
-    
+
+                        private List<PTable> getNewIndexes(PTable table) throws SQLException {
+                            List<PTable> indexes = table.getIndexes();
+                            List<PTable> newIndexes = new ArrayList<PTable>(2);
+                            PTable newTable = PhoenixRuntime.getTableNoCache(connection, table.getName().getString());
+                            for (PTable index : newTable.getIndexes()) {
+                                if (!indexes.contains(index)) {
+                                    newIndexes.add(index);
+                                }
+                            }
+                            return newIndexes;
+                        }
+
                         @Override
                         public ExplainPlan getExplainPlan() throws SQLException {
                             List<String> queryPlanSteps =  aggPlan.getExplainPlan().getPlanSteps();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f6d02f7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index c5854d3..2dec235 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -35,7 +35,6 @@ import java.security.PrivilegedExceptionAction;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
@@ -59,6 +58,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.Region;
@@ -72,7 +72,6 @@ import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
-import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
@@ -127,6 +126,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
 
 
 /**
@@ -288,6 +288,41 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         return s;
     }
 
+    class MutationList extends ArrayList<Mutation> implements HeapSize {
+        private long heapSize = 0l;
+        public MutationList() {
+            super();
+        }
+        
+        public MutationList(int size){
+            super(size);
+        }
+        
+        @Override
+        public boolean add(Mutation e) {
+            boolean r = super.add(e);
+            if (r) {
+                incrementHeapSize(e.heapSize());
+            }
+            return r;
+        }
+
+        @Override
+        public long heapSize() {
+            return heapSize;
+        }
+
+        private void incrementHeapSize(long heapSize) {
+            this.heapSize += heapSize;
+        }
+
+        @Override
+        public void clear() {
+            heapSize = 0l;
+            super.clear();
+        }
+    }
+    
     @Override
     protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException, SQLException {
         RegionCoprocessorEnvironment env = c.getEnvironment();
@@ -339,7 +374,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
         }
         List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto);
-        List<Mutation> indexMutations = localIndexBytes == null ? Collections.<Mutation>emptyList() : Lists.<Mutation>newArrayListWithExpectedSize(1024);
+        MutationList indexMutations = localIndexBytes == null ? new MutationList() : new MutationList(1024);
         
         RegionScanner theScanner = s;
         
@@ -395,9 +430,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), env, useQualifierAsIndex, useNewValueColumnQualifier);
         }
         
-        int batchSize = 0;
-        long batchSizeBytes = 0L;
-        List<Mutation> mutations = Collections.emptyList();
+        int maxBatchSize = 0;
+        long maxBatchSizeBytes = 0L;
+        MutationList mutations = new MutationList();
         boolean needToWrite = false;
         Configuration conf = c.getEnvironment().getConfiguration();
         long flushSize = region.getTableDesc().getMemStoreFlushSize();
@@ -420,10 +455,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan;
         if (isDescRowKeyOrderUpgrade || isDelete || isUpsert || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) {
             needToWrite = true;
-            // TODO: size better
-            mutations = Lists.newArrayListWithExpectedSize(1024);
-            batchSize = env.getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
-            batchSizeBytes = env.getConfiguration().getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+            maxBatchSize = env.getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+            mutations = new MutationList(Ints.saturatedCast(maxBatchSize + maxBatchSize / 10));
+            maxBatchSizeBytes = env.getConfiguration().getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
                 QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
         }
         Aggregators aggregators = ServerAggregators.deserialize(
@@ -666,22 +700,17 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                     mutations.add(put);
                                 }
                             }
-                            // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
-                            List<List<Mutation>> batchMutationList =
-                                MutationState.getMutationBatchList(batchSize, batchSizeBytes, mutations);
-                            for (List<Mutation> batchMutations : batchMutationList) {
-                                commit(region, batchMutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr,
-                                        txState, areMutationInSameRegion, targetHTable, useIndexProto);
-                                batchMutations.clear();
-                            }
+                        }
+                        if (readyToCommit(mutations, maxBatchSize, maxBatchSizeBytes)) {
+                            commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState,
+                                    areMutationInSameRegion, targetHTable, useIndexProto);
                             mutations.clear();
-                            // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
-                            List<List<Mutation>> batchIndexMutationList =
-                                MutationState.getMutationBatchList(batchSize, batchSizeBytes, indexMutations);
-                            for (List<Mutation> batchIndexMutations : batchIndexMutationList) {
-                                commitBatch(region, batchIndexMutations, null, blockingMemStoreSize, null, txState, useIndexProto);
-                                batchIndexMutations.clear();
-                            }
+                        }
+                        // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
+
+                        if (readyToCommit(indexMutations, maxBatchSize, maxBatchSizeBytes)) {
+                            commitBatch(region, indexMutations, null, blockingMemStoreSize, null, txState,
+                                    useIndexProto);
                             indexMutations.clear();
                         }
                         aggregators.aggregate(rowAggregators, result);
@@ -774,10 +803,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         return false;
     }
 
-    private boolean readyToCommit(List<Mutation> mutations,int batchSize){
-        return !mutations.isEmpty() && batchSize > 0 &&
-        mutations.size() > batchSize;
+    private boolean readyToCommit(MutationList mutations, int maxBatchSize, long maxBatchSizeBytes) {
+        return !mutations.isEmpty() && (maxBatchSize > 0 && mutations.size() > maxBatchSize)
+                || (maxBatchSizeBytes > 0 && mutations.heapSize() > maxBatchSizeBytes);
     }
+
     @Override
     public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
             final InternalScanner scanner, final ScanType scanType) throws IOException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f6d02f7/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 262047c..f2820f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1226,21 +1226,63 @@ public class MetaDataClient {
         }
         throw new IllegalStateException(); // impossible
     }
+    
+    /**
+     * For new mutations only should not be used if there are deletes done in the data table between start time and end
+     * time passed to the method.
+     */
+    public MutationState buildIndex(PTable index, TableRef dataTableRef, long startTime, long EndTime)
+            throws SQLException {
+        boolean wasAutoCommit = connection.getAutoCommit();
+        try {
+            AlterIndexStatement indexStatement = FACTORY
+                    .alterIndex(
+                            FACTORY.namedTable(null,
+                                    TableName.create(index.getSchemaName().getString(),
+                                            index.getTableName().getString())),
+                            dataTableRef.getTable().getTableName().getString(), false, PIndexState.INACTIVE);
+            alterIndex(indexStatement);
+            connection.setAutoCommit(true);
+            MutationPlan mutationPlan = getMutationPlanForBuildingIndex(index, dataTableRef);
+            Scan scan = mutationPlan.getContext().getScan();
+            try {
+                scan.setTimeRange(startTime, EndTime);
+            } catch (IOException e) {
+                throw new SQLException(e);
+            }
+            MutationState state = connection.getQueryServices().updateData(mutationPlan);
+            indexStatement = FACTORY
+                    .alterIndex(
+                            FACTORY.namedTable(null,
+                                    TableName.create(index.getSchemaName().getString(),
+                                            index.getTableName().getString())),
+                            dataTableRef.getTable().getTableName().getString(), false, PIndexState.ACTIVE);
+            alterIndex(indexStatement);
+            return state;
+        } finally {
+            connection.setAutoCommit(wasAutoCommit);
+        }
+    }
+
+    private MutationPlan getMutationPlanForBuildingIndex(PTable index, TableRef dataTableRef) throws SQLException {
+        MutationPlan mutationPlan;
+        if (index.getIndexType() == IndexType.LOCAL) {
+            PostLocalIndexDDLCompiler compiler =
+                    new PostLocalIndexDDLCompiler(connection, getFullTableName(dataTableRef));
+            mutationPlan = compiler.compile(index);
+        } else {
+            PostIndexDDLCompiler compiler = new PostIndexDDLCompiler(connection, dataTableRef);
+            mutationPlan = compiler.compile(index);
+        }
+        return mutationPlan;
+    }
 
     private MutationState buildIndex(PTable index, TableRef dataTableRef) throws SQLException {
         AlterIndexStatement indexStatement = null;
         boolean wasAutoCommit = connection.getAutoCommit();
         try {
             connection.setAutoCommit(true);
-            MutationPlan mutationPlan;
-            if (index.getIndexType() == IndexType.LOCAL) {
-                PostLocalIndexDDLCompiler compiler =
-                        new PostLocalIndexDDLCompiler(connection, getFullTableName(dataTableRef));
-                mutationPlan = compiler.compile(index);
-            } else {
-                PostIndexDDLCompiler compiler = new PostIndexDDLCompiler(connection, dataTableRef);
-                mutationPlan = compiler.compile(index);
-            }
+            MutationPlan mutationPlan = getMutationPlanForBuildingIndex(index, dataTableRef);
             Scan scan = mutationPlan.getContext().getScan();
             Long scn = connection.getSCN();
             try {