You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/08/20 03:42:07 UTC
[1/2] git commit: PHOENIX-1186 Pass scan for parallel chunk of work
through to ParallelIteratorFactory
Repository: phoenix
Updated Branches:
refs/heads/master 1fe4af34f -> b5971dae6
PHOENIX-1186 Pass scan for parallel chunk of work through to ParallelIteratorFactory
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5c0a08e5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5c0a08e5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5c0a08e5
Branch: refs/heads/master
Commit: 5c0a08e523a1f66d25c87af3a1fb396c36dc4249
Parents: 1fe4af3
Author: James Taylor <ja...@apache.org>
Authored: Tue Aug 19 16:19:07 2014 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Tue Aug 19 18:39:34 2014 -0700
----------------------------------------------------------------------
.../MutatingParallelIteratorFactory.java | 3 ++-
.../apache/phoenix/execute/AggregatePlan.java | 9 +++++----
.../phoenix/iterate/ChunkedResultIterator.java | 21 ++++++++------------
.../phoenix/iterate/ParallelIterators.java | 4 ++--
.../phoenix/iterate/SpoolingResultIterator.java | 3 ++-
5 files changed, 19 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5c0a08e5/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index fbfce29..df91b1d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@ -26,6 +26,7 @@ import java.sql.SQLException;
import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
import org.apache.phoenix.iterate.PeekingResultIterator;
@@ -58,7 +59,7 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
abstract protected MutationState mutate(StatementContext context, ResultIterator iterator, PhoenixConnection connection) throws SQLException;
@Override
- public PeekingResultIterator newIterator(StatementContext context, ResultIterator iterator) throws SQLException {
+ public PeekingResultIterator newIterator(StatementContext context, ResultIterator iterator, Scan scan) throws SQLException {
final PhoenixConnection connection = new PhoenixConnection(this.connection);
MutationState state = mutate(context, iterator, connection);
long totalRowCount = state.getUpdateCount();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5c0a08e5/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 67c7bb7..d45e036 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -22,6 +22,7 @@ import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.compile.RowProjector;
@@ -94,7 +95,7 @@ public class AggregatePlan extends BasicQueryPlan {
this.services = services;
}
@Override
- public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner) throws SQLException {
+ public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException {
Expression expression = RowKeyExpression.INSTANCE;
OrderByExpression orderByExpression = new OrderByExpression(expression, false, true);
int threshold = services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
@@ -111,9 +112,9 @@ public class AggregatePlan extends BasicQueryPlan {
this.outerFactory = outerFactory;
}
@Override
- public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner) throws SQLException {
- PeekingResultIterator iterator = innerFactory.newIterator(context, scanner);
- return outerFactory.newIterator(context, iterator);
+ public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException {
+ PeekingResultIterator iterator = innerFactory.newIterator(context, scanner, scan);
+ return outerFactory.newIterator(context, iterator, scan);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5c0a08e5/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index cfaca84..38e91bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -18,7 +18,6 @@
package org.apache.phoenix.iterate;
-import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
@@ -26,11 +25,11 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.exception.PhoenixIOException;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ScanUtil;
/**
* {@code PeekingResultIterator} implementation that loads data in chunks. This is intended for
@@ -58,9 +57,9 @@ public class ChunkedResultIterator implements PeekingResultIterator {
}
@Override
- public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner) throws SQLException {
+ public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException {
scanner.close(); //close the iterator since we don't need it anymore.
- return new ChunkedResultIterator(delegateFactory, context, tableRef,
+ return new ChunkedResultIterator(delegateFactory, context, tableRef, scan,
context.getConnection().getQueryServices().getProps().getLong(
QueryServices.SCAN_RESULT_CHUNK_SIZE,
QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE));
@@ -68,11 +67,11 @@ public class ChunkedResultIterator implements PeekingResultIterator {
}
public ChunkedResultIterator(ParallelIterators.ParallelIteratorFactory delegateIteratorFactory,
- StatementContext context, TableRef tableRef, long chunkSize) {
+ StatementContext context, TableRef tableRef, Scan scan, long chunkSize) {
this.delegateIteratorFactory = delegateIteratorFactory;
this.context = context;
this.tableRef = tableRef;
- this.scan = context.getScan();
+ this.scan = scan;
this.chunkSize = chunkSize;
}
@@ -105,18 +104,14 @@ public class ChunkedResultIterator implements PeekingResultIterator {
if (resultIterator == null) {
singleChunkResultIterator = new SingleChunkResultIterator(
new TableResultIterator(context, tableRef, scan), chunkSize);
- resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator);
+ resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan);
} else if (resultIterator.peek() == null && !singleChunkResultIterator.isEndOfStreamReached()) {
singleChunkResultIterator.close();
- try {
- this.scan = new Scan(scan);
- } catch (IOException e) {
- throw new PhoenixIOException(e);
- }
+ scan = ScanUtil.newScan(scan);
scan.setStartRow(Bytes.add(singleChunkResultIterator.getLastKey(), new byte[]{0}));
singleChunkResultIterator = new SingleChunkResultIterator(
new TableResultIterator(context, tableRef, scan), chunkSize);
- resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator);
+ resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan);
}
return resultIterator;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5c0a08e5/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index eb2cf71..687453f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -90,7 +90,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
private final ParallelIteratorFactory iteratorFactory;
public static interface ParallelIteratorFactory {
- PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner) throws SQLException;
+ PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException;
}
private static final int DEFAULT_THREAD_TIMEOUT_MS = 60000; // 1min
@@ -366,7 +366,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
if (logger.isDebugEnabled()) {
logger.debug("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan);
}
- return iteratorFactory.newIterator(scanContext, scanner);
+ return iteratorFactory.newIterator(scanContext, scanner, splitScan);
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5c0a08e5/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
index 1a256de..4672657 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
@@ -28,6 +28,7 @@ import java.sql.SQLException;
import java.util.List;
import org.apache.commons.io.output.DeferredFileOutputStream;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.compile.StatementContext;
@@ -63,7 +64,7 @@ public class SpoolingResultIterator implements PeekingResultIterator {
this.services = services;
}
@Override
- public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner) throws SQLException {
+ public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException {
return new SpoolingResultIterator(scanner, services);
}
[2/2] git commit: PHOENIX-1186 Pass scan for parallel chunk of work
through to ParallelIteratorFactory
Posted by ja...@apache.org.
PHOENIX-1186 Pass scan for parallel chunk of work through to ParallelIteratorFactory
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b5971dae
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b5971dae
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b5971dae
Branch: refs/heads/master
Commit: b5971dae61b36f058d37dbc699548da63a068e4e
Parents: 5c0a08e
Author: James Taylor <ja...@apache.org>
Authored: Tue Aug 19 18:35:07 2014 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Tue Aug 19 18:40:01 2014 -0700
----------------------------------------------------------------------
.../phoenix/compile/StatementContext.java | 24 --------------------
.../phoenix/iterate/ChunkedResultIterator.java | 6 +++++
.../phoenix/iterate/ParallelIterators.java | 5 ++--
3 files changed, 8 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5971dae/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index 5bebfd8..1c75527 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -102,30 +102,6 @@ public class StatementContext {
}
/**
- * Copy constructor where an altered scan can be set.
- *
- * @param stmtContext the {@code StatementContext} to be copied
- * @param scan the customized scan
- */
- public StatementContext(StatementContext stmtContext, Scan scan) {
- this.statement = stmtContext.statement;
- this.resolver = stmtContext.resolver;
- this.scan = scan;
- this.sequences = stmtContext.sequences;
- this.binds = stmtContext.binds;
- this.aggregates = stmtContext.aggregates;
- this.expressions = stmtContext.expressions;
- this.dateFormat = stmtContext.dateFormat;
- this.dateFormatter = stmtContext.dateFormatter;
- this.dateParser = stmtContext.dateParser;
- this.numberFormat = stmtContext.numberFormat;
- this.tempPtr = new ImmutableBytesWritable();
- this.currentTable = stmtContext.currentTable;
- this.whereConditionColumns = stmtContext.whereConditionColumns;
- this.dataColumns = stmtContext.getDataColumnsMap();
- }
-
- /**
* build map from dataColumn to what will be its position in single KeyValue value bytes
* returned from the coprocessor that joins from the index row back to the data row.
* @param column
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5971dae/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index 38e91bd..d7fbe79 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -30,12 +30,15 @@ import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.ScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* {@code PeekingResultIterator} implementation that loads data in chunks. This is intended for
* basic scan plans, to avoid loading large quantities of data from HBase in one go.
*/
public class ChunkedResultIterator implements PeekingResultIterator {
+ private static final Logger logger = LoggerFactory.getLogger(ChunkedResultIterator.class);
private final ParallelIterators.ParallelIteratorFactory delegateIteratorFactory;
private SingleChunkResultIterator singleChunkResultIterator;
@@ -59,6 +62,7 @@ public class ChunkedResultIterator implements PeekingResultIterator {
@Override
public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException {
scanner.close(); //close the iterator since we don't need it anymore.
+ if (logger.isDebugEnabled()) logger.debug("ChunkedResultIteratorFactory.newIterator over " + tableRef.getTable().getName().getString() + " with " + scan);
return new ChunkedResultIterator(delegateFactory, context, tableRef, scan,
context.getConnection().getQueryServices().getProps().getLong(
QueryServices.SCAN_RESULT_CHUNK_SIZE,
@@ -102,6 +106,7 @@ public class ChunkedResultIterator implements PeekingResultIterator {
private PeekingResultIterator getResultIterator() throws SQLException {
if (resultIterator == null) {
+ if (logger.isDebugEnabled()) logger.debug("Get first chunked result iterator over " + tableRef.getTable().getName().getString() + " with " + scan);
singleChunkResultIterator = new SingleChunkResultIterator(
new TableResultIterator(context, tableRef, scan), chunkSize);
resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan);
@@ -109,6 +114,7 @@ public class ChunkedResultIterator implements PeekingResultIterator {
singleChunkResultIterator.close();
scan = ScanUtil.newScan(scan);
scan.setStartRow(Bytes.add(singleChunkResultIterator.getLastKey(), new byte[]{0}));
+ if (logger.isDebugEnabled()) logger.debug("Get next chunked result iterator over " + tableRef.getTable().getName().getString() + " with " + scan);
singleChunkResultIterator = new SingleChunkResultIterator(
new TableResultIterator(context, tableRef, scan), chunkSize);
resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5971dae/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 687453f..3d03ddc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -360,13 +360,12 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
@Override
public PeekingResultIterator call() throws Exception {
- StatementContext scanContext = new StatementContext(context, splitScan);
long startTime = System.currentTimeMillis();
- ResultIterator scanner = new TableResultIterator(scanContext, tableRef, splitScan);
+ ResultIterator scanner = new TableResultIterator(context, tableRef, splitScan);
if (logger.isDebugEnabled()) {
logger.debug("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan);
}
- return iteratorFactory.newIterator(scanContext, scanner, splitScan);
+ return iteratorFactory.newIterator(context, scanner, splitScan);
}
/**