You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/11/21 06:49:09 UTC

[2/2] phoenix git commit: PHOENIX-1466 Prevent multiple scans when query run serially

PHOENIX-1466 Prevent multiple scans when query run serially


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a1ba9bac
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a1ba9bac
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a1ba9bac

Branch: refs/heads/4.2
Commit: a1ba9bace1f44ed418bc01467a10ab2b3eea848b
Parents: 4c47b0e
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Nov 18 10:37:28 2014 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Nov 20 21:48:57 2014 -0800

----------------------------------------------------------------------
 .../phoenix/end2end/QueryWithLimitIT.java       | 119 +++++++++++++++++++
 .../org/apache/phoenix/execute/ScanPlan.java    |  81 +++++++------
 .../DistinctValueWithCountServerAggregator.java |   5 -
 .../iterate/ParallelIteratorFactory.java        |   7 ++
 .../apache/phoenix/iterate/SerialIterators.java |   9 +-
 .../phoenix/iterate/TableResultIterator.java    |  42 +++++--
 6 files changed, 206 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a1ba9bac/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java
new file mode 100644
index 0000000..2df9514
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.KEYONLY_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class QueryWithLimitIT extends BaseOwnClusterHBaseManagedTimeIT {
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        // Must update config before starting server
+        props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(50));
+        props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1));
+        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.TRUE.toString());
+        props.put(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB, Integer.toString(0)); // Prevents RejectedExecutionException when deleting sequences
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+    
+    @Test
+    public void testQueryWithLimitAndStats() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            ensureTableCreated(getUrl(),KEYONLY_NAME);
+            initTableValues(conn, 100);
+            
+            String query = "SELECT i1 FROM KEYONLY LIMIT 1";
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(0, rs.getInt(1));
+            assertFalse(rs.next());
+            
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            assertEquals("CLIENT SERIAL 1-WAY FULL SCAN OVER KEYONLY\n" + 
+                    "    SERVER FILTER BY PageFilter 1\n" + 
+                    "    SERVER 1 ROW LIMIT\n" + 
+                    "CLIENT 1 ROW LIMIT", QueryUtil.getExplainPlan(rs));
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testQueryWithoutLimitFails() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+
+        ensureTableCreated(getUrl(),KEYONLY_NAME);
+        initTableValues(conn, 100);
+        conn.createStatement().execute("UPDATE STATISTICS " + KEYONLY_NAME);
+        
+        String query = "SELECT i1 FROM KEYONLY";
+        try {
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            rs.next();
+            fail();
+        } catch (SQLException e) {
+            assertTrue(e.getCause() instanceof RejectedExecutionException);
+        }
+        conn.close();
+    }
+    
+    protected static void initTableValues(Connection conn, int nRows) throws Exception {
+        PreparedStatement stmt = conn.prepareStatement(
+            "upsert into " +
+            "KEYONLY VALUES (?, ?)");
+        for (int i = 0; i < nRows; i++) {
+            stmt.setInt(1, i);
+            stmt.setInt(2, i+1);
+            stmt.execute();
+        }
+        
+        conn.commit();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a1ba9bac/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 4ca0338..d82e8f4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -53,6 +53,7 @@ import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.StatisticsUtil;
+import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,10 +73,10 @@ public class ScanPlan extends BaseQueryPlan {
     private List<List<Scan>> scans;
     private boolean allowPageFilter;
 
-    public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) {
+    public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) throws SQLException {
         super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY,
                 parallelIteratorFactory != null ? parallelIteratorFactory :
-                        buildResultIteratorFactory(context, table, orderBy));
+                        buildResultIteratorFactory(context, table, orderBy, limit, allowPageFilter));
         this.allowPageFilter = allowPageFilter;
         if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN
             int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
@@ -84,9 +85,51 @@ public class ScanPlan extends BaseQueryPlan {
         }
     }
 
+    private static boolean isSerial(StatementContext context,
+            TableRef tableRef, OrderBy orderBy, Integer limit, boolean allowPageFilter) throws SQLException {
+        Scan scan = context.getScan();
+        /*
+         * If a limit is provided and we have no filter, run the scan serially when we estimate that
+         * the limit's worth of data will fit into a single region.
+         */
+        boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
+        Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit;
+        if (perScanLimit == null || scan.getFilter() != null) {
+            return false;
+        }
+        PTable table = tableRef.getTable();
+        GuidePostsInfo gpsInfo = table.getTableStats().getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table));
+        long estRowSize = SchemaUtil.estimateRowSize(table);
+        long estRegionSize;
+        if (gpsInfo == null) {
+            // Use guidepost depth as minimum size
+            ConnectionQueryServices services = context.getConnection().getQueryServices();
+            HTableDescriptor desc = services.getTableDescriptor(table.getPhysicalName().getBytes());
+            int guidepostPerRegion = services.getProps().getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
+                    QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION);
+            long guidepostWidth = services.getProps().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+                    QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
+            estRegionSize = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth, desc);
+        } else {
+            // Region size estimated based on total number of bytes divided by number of regions
+            estRegionSize = gpsInfo.getByteCount() / (gpsInfo.getGuidePosts().size()+1);
+        }
+        // TODO: configurable number of bytes?
+        boolean isSerial = (perScanLimit * estRowSize < estRegionSize);
+        
+        if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("With LIMIT=" + perScanLimit
+                + ", estimated row size=" + estRowSize
+                + ", estimated region size=" + estRegionSize + " (" + (gpsInfo == null ? "without " : "with ") + "stats)"
+                + ": " + (isSerial ? "SERIAL" : "PARALLEL") + " execution", context.getConnection()));
+        return isSerial;
+    }
+    
     private static ParallelIteratorFactory buildResultIteratorFactory(StatementContext context,
-            TableRef table, OrderBy orderBy) {
+            TableRef table, OrderBy orderBy, Integer limit, boolean allowPageFilter) throws SQLException {
 
+        if (isSerial(context, table, orderBy, limit, allowPageFilter)) {
+            return ParallelIteratorFactory.NOOP_FACTORY;
+        }
         ParallelIteratorFactory spoolingResultIteratorFactory =
                 new SpoolingResultIterator.SpoolingResultIteratorFactory(
                         context.getConnection().getQueryServices());
@@ -125,38 +168,8 @@ public class ScanPlan extends BaseQueryPlan {
          * limit is provided, run query serially.
          */
         boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
-        boolean isSerial = false;
+        boolean isSerial = isSerial(context, tableRef, orderBy, limit, allowPageFilter);
         Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit;
-        /*
-         * If a limit is provided and we have no filter, run the scan serially when we estimate that
-         * the limit's worth of data will fit into a single region.
-         */
-        if (perScanLimit != null && scan.getFilter() == null) {
-        	GuidePostsInfo gpsInfo = table.getTableStats().getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table));
-            long estRowSize = SchemaUtil.estimateRowSize(table);
-        	long estRegionSize;
-        	if (gpsInfo == null) {
-        	    // Use guidepost depth as minimum size
-        	    ConnectionQueryServices services = context.getConnection().getQueryServices();
-        	    HTableDescriptor desc = services.getTableDescriptor(table.getPhysicalName().getBytes());
-                int guidepostPerRegion = services.getProps().getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
-                        QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION);
-                long guidepostWidth = services.getProps().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
-                        QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
-        	    estRegionSize = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth, desc);
-        	} else {
-        		// Region size estimated based on total number of bytes divided by number of regions
-        	    estRegionSize = gpsInfo.getByteCount() / (gpsInfo.getGuidePosts().size()+1);
-        	}
-            // TODO: configurable number of bytes?
-            if (perScanLimit * estRowSize < estRegionSize) {
-                isSerial = true;
-            }
-            if (logger.isDebugEnabled()) logger.debug("With LIMIT=" + perScanLimit 
-                    + ", estimated row size=" + estRowSize 
-                    + ", estimated region size=" + estRegionSize + " (" + (gpsInfo == null ? "without " : "with ") + "stats)" 
-                    + ": " + (isSerial ? "SERIAL" : "PARALLEL") + " execution");
-        }
         ResultIterators iterators;
         if (isSerial) {
         	iterators = new SerialIterators(this, perScanLimit, parallelIteratorFactory);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a1ba9bac/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
index a3141b1..3a1789b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
@@ -24,9 +24,6 @@ import java.util.Map.Entry;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -35,7 +32,6 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.SizedUtil;
-
 import org.iq80.snappy.Snappy;
 
 /**
@@ -45,7 +41,6 @@ import org.iq80.snappy.Snappy;
  * @since 1.2.1
  */
 public class DistinctValueWithCountServerAggregator extends BaseAggregator {
-    private static final Logger LOG = LoggerFactory.getLogger(DistinctValueWithCountServerAggregator.class);
     public static final int DEFAULT_ESTIMATED_DISTINCT_VALUES = 10000;
     public static final byte[] COMPRESS_MARKER = new byte[] { (byte)1 };
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a1ba9bac/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
index 1ad3af0..df8f658 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
@@ -23,5 +23,12 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.StatementContext;
 
 public interface ParallelIteratorFactory {
+    public static ParallelIteratorFactory NOOP_FACTORY = new ParallelIteratorFactory() {
+        @Override
+        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan)
+                throws SQLException {
+            return LookAheadResultIterator.wrap(scanner);
+        }
+    };
     PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a1ba9bac/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index 4be7b56..c01a268 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -49,14 +49,12 @@ public class SerialIterators extends BaseResultIterators {
 	private static final Logger logger = LoggerFactory.getLogger(SerialIterators.class);
 	private static final String NAME = "SERIAL";
     private final ParallelIteratorFactory iteratorFactory;
-    private final int limit;
     
     public SerialIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory)
             throws SQLException {
         super(plan, perScanLimit);
         Preconditions.checkArgument(perScanLimit != null); // must be a limit specified
         this.iteratorFactory = iteratorFactory;
-        this.limit = perScanLimit;
     }
 
     @Override
@@ -88,9 +86,8 @@ public class SerialIterators extends BaseResultIterators {
 	                    concatIterators.add(iteratorFactory.newIterator(context, scanner, scan));
                 	}
                 	PeekingResultIterator concatIterator = ConcatResultIterator.newIterator(concatIterators);
-                	PeekingResultIterator iterator = new LimitingPeekingResultIterator(concatIterator, limit);
-                    allIterators.add(iterator);
-                    return iterator;
+                    allIterators.add(concatIterator);
+                    return concatIterator;
                 }
 
                 /**
@@ -102,7 +99,7 @@ public class SerialIterators extends BaseResultIterators {
                 public Object getJobId() {
                     return SerialIterators.this;
                 }
-            }, "Parallel scanner for table: " + tableRef.getTable().getName().getString()));
+            }, "Serial scanner for table: " + tableRef.getTable().getName().getString()));
             // Add our singleton Future which will execute serially
             nestedFutures.add(Collections.singletonList(new Pair<Scan,Future<PeekingResultIterator>>(overallScan,future)));
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a1ba9bac/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 58abec5..9cc4ad0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -23,7 +23,6 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Scan;
-
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
@@ -40,28 +39,48 @@ import org.apache.phoenix.util.ServerUtil;
  * @since 0.1
  */
 public class TableResultIterator extends ExplainTable implements ResultIterator {
+    private final Scan scan;
     private final HTableInterface htable;
-    private final ResultIterator delegate;
+    private volatile ResultIterator delegate;
 
     public TableResultIterator(StatementContext context, TableRef tableRef) throws SQLException {
         this(context, tableRef, context.getScan());
     }
 
+    /*
+     * Delay the creation of the underlying HBase ResultScanner if creationMode is DELAYED.
+     * Though no rows are returned when the scanner is created, it still makes several RPCs
+     * to open the scanner. In queries run serially (i.e. SELECT ... LIMIT 1), we do not
+     * want to be hit with this cost when it's likely we'll never execute those scanners.
+     */
+    private ResultIterator getDelegate(boolean isClosing) throws SQLException {
+        ResultIterator delegate = this.delegate;
+        if (delegate == null) {
+            synchronized (this) {
+                delegate = this.delegate;
+                if (delegate == null) {
+                    try {
+                        this.delegate = delegate = isClosing ? ResultIterator.EMPTY_ITERATOR : new ScanningResultIterator(htable.getScanner(scan));
+                    } catch (IOException e) {
+                        Closeables.closeQuietly(htable);
+                        throw ServerUtil.parseServerException(e);
+                    }
+                }
+            }
+        }
+        return delegate;
+    }
+    
     public TableResultIterator(StatementContext context, TableRef tableRef, Scan scan) throws SQLException {
         super(context, tableRef);
+        this.scan = scan;
         htable = context.getConnection().getQueryServices().getTable(tableRef.getTable().getPhysicalName().getBytes());
-        try {
-            delegate = new ScanningResultIterator(htable.getScanner(scan));
-        } catch (IOException e) {
-            Closeables.closeQuietly(htable);
-            throw ServerUtil.parseServerException(e);
-        }
     }
 
     @Override
     public void close() throws SQLException {
         try {
-            delegate.close();
+            getDelegate(true).close();
         } finally {
             try {
                 htable.close();
@@ -73,7 +92,7 @@ public class TableResultIterator extends ExplainTable implements ResultIterator
 
     @Override
     public Tuple next() throws SQLException {
-        return delegate.next();
+        return getDelegate(false).next();
     }
 
     @Override
@@ -84,7 +103,6 @@ public class TableResultIterator extends ExplainTable implements ResultIterator
 
 	@Override
 	public String toString() {
-		return "TableResultIterator [htable=" + htable + ", delegate="
-				+ delegate + "]";
+		return "TableResultIterator [htable=" + htable + ", scan=" + scan  + "]";
 	}
 }