You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2018/01/17 20:40:28 UTC

[2/2] asterixdb git commit: [ASTERIXDB-2249][API] Add Max Result Reads to API

[ASTERIXDB-2249][API] Add  Max Result Reads to API

- user model changes: no
- storage format changes: no
- interface changes: yes
  - IRequestParameters: add ResultProperties
  - IDatasetPartitionManager: add maxReads

Details:
- Add option to specify max result reads and default
  it to 1.
- Fix exception handling in DatasetPartitionReader.
- Add option to specify maxResultReads in tests.
- Use new option in async-repeated test.
- Add test case for exhausted result.

Change-Id: I86f75c791f034142c5b046445870bd91378c5b3a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2292
Reviewed-by: Michael Blow <mb...@apache.org>
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/92f7cb58
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/92f7cb58
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/92f7cb58

Branch: refs/heads/master
Commit: 92f7cb582a9f579d15068e8aa5b10639c59bb441
Parents: a49c7cc
Author: Murtadha Hubail <mh...@apache.org>
Authored: Wed Jan 17 21:26:35 2018 +0300
Committer: Murtadha Hubail <mh...@apache.org>
Committed: Wed Jan 17 12:39:52 2018 -0800

----------------------------------------------------------------------
 .../asterix/translator/IRequestParameters.java  |   6 +-
 .../asterix/translator/ResultProperties.java    |  53 ++++++++++
 .../asterix/api/http/server/ApiServlet.java     |   3 +-
 .../api/http/server/NCQueryServiceServlet.java  |  11 +-
 .../api/http/server/QueryServiceServlet.java    |  25 +++--
 .../asterix/api/http/server/RestApiServlet.java |   4 +-
 .../asterix/api/java/AsterixJavaClient.java     |   5 +-
 .../message/ExecuteStatementRequestMessage.java |  10 +-
 .../asterix/app/translator/QueryTranslator.java |   5 +-
 .../app/translator/RequestParameters.java       |  12 +--
 .../asterix/test/common/TestExecutor.java       |  18 +++-
 .../async-deferred/AsyncDeferredQueries.xml     |   6 ++
 .../async-exhausted-result.1.async.sqlpp        |  23 +++++
 .../async-exhausted-result.2.pollget.uri        |  23 +++++
 .../async-exhausted-result.3.get.uri            |  20 ++++
 .../async-exhausted-result.4.get.uri            |  20 ++++
 .../async-repeated/async-repeated.1.async.sqlpp |   1 +
 .../async-exhausted-result.1.ignore             |   0
 .../async-exhausted-result.2.regex              |   2 +
 .../async-exhausted-result.3.json               |  10 ++
 .../metadata/declared/MetadataProvider.java     |  11 +-
 .../api/dataset/IDatasetPartitionManager.java   |   2 +-
 .../comm/channels/NetworkOutputChannel.java     |   3 +-
 .../nc/dataset/DatasetPartitionManager.java     |   4 +-
 .../nc/dataset/DatasetPartitionReader.java      | 103 ++++++++++---------
 .../nc/dataset/DatasetPartitionWriter.java      |   4 +-
 .../hyracks/control/nc/dataset/ResultState.java |  20 +++-
 .../result/ResultWriterOperatorDescriptor.java  |   9 +-
 .../tests/integration/AggregationTest.java      |   2 +-
 .../tests/integration/CountOfCountsTest.java    |   6 +-
 .../tests/integration/HeapSortMergeTest.java    |   2 +-
 .../integration/LocalityAwareConnectorTest.java |   2 +-
 .../integration/ReplicateOperatorTest.java      |   2 +-
 .../tests/integration/ScanPrintTest.java        |   6 +-
 .../tests/integration/SortMergeTest.java        |   4 +-
 .../TPCHCustomerOrderHashJoinTest.java          |  16 +--
 .../TPCHCustomerOrderNestedLoopJoinTest.java    |   8 +-
 .../hyracks/tests/integration/UnionTest.java    |   2 +-
 38 files changed, 345 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
index 8d0f20b..a1fbac6 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
@@ -31,9 +31,11 @@ public interface IRequestParameters {
     IHyracksDataset getHyracksDataset();
 
     /**
-     * @return The {@code ResultDelivery} kind required for queries in the list of statements
+     * Gets the required result properties of the request.
+     *
+     * @return the result properties
      */
-    IStatementExecutor.ResultDelivery getResultDelivery();
+    ResultProperties getResultProperties();
 
     /**
      * @return a reference to write the stats of executed queries

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultProperties.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultProperties.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultProperties.java
new file mode 100644
index 0000000..4866c6d
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultProperties.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.translator;
+
+import java.io.Serializable;
+
+public class ResultProperties implements Serializable {
+
+    public static final long DEFAULT_MAX_READS = 1;
+    private final IStatementExecutor.ResultDelivery delivery;
+    private final long maxReads;
+
+    public ResultProperties(IStatementExecutor.ResultDelivery delivery) {
+        this(delivery, DEFAULT_MAX_READS);
+    }
+
+    public ResultProperties(IStatementExecutor.ResultDelivery delivery, long maxReads) {
+        this.delivery = delivery;
+        this.maxReads = maxReads;
+    }
+
+    public IStatementExecutor.ResultDelivery getDelivery() {
+        return delivery;
+    }
+
+    public long getMaxReads() {
+        return maxReads;
+    }
+
+    public ResultProperties getNcToCcResultProperties() {
+        if (delivery != IStatementExecutor.ResultDelivery.IMMEDIATE) {
+            return this;
+        }
+        // switch IMMEDIATE to DEFERRED since the result will be severed by the NC
+        return new ResultProperties(IStatementExecutor.ResultDelivery.DEFERRED, maxReads);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index 87c1c57..df2a2a1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -48,6 +48,7 @@ import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionConfig.OutputFormat;
 import org.apache.asterix.translator.SessionConfig.PlanFormat;
@@ -164,7 +165,7 @@ public class ApiServlet extends AbstractServlet {
             double duration;
             long startTime = System.currentTimeMillis();
             final IRequestParameters requestParameters =
-                    new RequestParameters(hds, IStatementExecutor.ResultDelivery.IMMEDIATE,
+                    new RequestParameters(hds, new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
                             new IStatementExecutor.Stats(), null, null, null);
             translator.compileAndExecute(hcc, null, requestParameters);
             long endTime = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 83a40f0..76f489c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -41,6 +41,7 @@ import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.messaging.api.MessageFuture;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.ResultProperties;
 import org.apache.asterix.translator.SessionOutput;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.hyracks.api.application.INCServiceContext;
@@ -66,14 +67,12 @@ public class NCQueryServiceServlet extends QueryServiceServlet {
 
     @Override
     protected void executeStatement(String statementsText, SessionOutput sessionOutput,
-            IStatementExecutor.ResultDelivery delivery, IStatementExecutor.Stats stats, RequestParameters param,
+            ResultProperties resultProperties, IStatementExecutor.Stats stats, RequestParameters param,
             RequestExecutionState execution, Map<String, String> optionalParameters) throws Exception {
         // Running on NC -> send 'execute' message to CC
         INCServiceContext ncCtx = (INCServiceContext) serviceCtx;
         INCMessageBroker ncMb = (INCMessageBroker) ncCtx.getMessageBroker();
-        IStatementExecutor.ResultDelivery ccDelivery =
-                delivery == IStatementExecutor.ResultDelivery.IMMEDIATE ? IStatementExecutor.ResultDelivery.DEFERRED
-                        : delivery;
+        final IStatementExecutor.ResultDelivery delivery = resultProperties.getDelivery();
         ExecuteStatementResponseMessage responseMsg;
         MessageFuture responseFuture = ncMb.registerMessageFuture();
         final String handleUrl = getHandleUrl(param.host, param.path, delivery);
@@ -86,8 +85,8 @@ public class NCQueryServiceServlet extends QueryServiceServlet {
                 timeout = TimeUnit.NANOSECONDS.toMillis(Duration.parseDurationStringToNanos(param.timeout));
             }
             ExecuteStatementRequestMessage requestMsg = new ExecuteStatementRequestMessage(ncCtx.getNodeId(),
-                    responseFuture.getFutureId(), queryLanguage, statementsText, sessionOutput.config(), ccDelivery,
-                    param.clientContextID, handleUrl, optionalParameters);
+                    responseFuture.getFutureId(), queryLanguage, statementsText, sessionOutput.config(),
+                    resultProperties.getNcToCcResultProperties(), param.clientContextID, handleUrl, optionalParameters);
             execution.start();
             ncMb.sendMessageToCC(requestMsg);
             try {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index a8f14b5..23a7ba7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -21,7 +21,6 @@ package org.apache.asterix.api.http.server;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.List;
 import java.util.Map;
@@ -48,6 +47,7 @@ import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -137,7 +137,8 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
         PRETTY("pretty"),
         MODE("mode"),
         TIMEOUT("timeout"),
-        PLAN_FORMAT("plan-format");
+        PLAN_FORMAT("plan-format"),
+        MAX_RESULT_READS("max-result-reads");
 
         private final String str;
 
@@ -193,6 +194,7 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
         boolean pretty;
         String clientContextID;
         String mode;
+        String maxResultReads;
 
         @Override
         public String toString() {
@@ -207,6 +209,7 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
                 on.put("clientContextID", clientContextID);
                 on.put("format", format);
                 on.put("timeout", timeout);
+                on.put("maxResultReads", maxResultReads);
                 return om.writer(new MinimalPrettyPrinter()).writeValueAsString(on);
             } catch (JsonProcessingException e) { // NOSONAR
                 return e.getMessage();
@@ -383,6 +386,7 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
                 param.mode = toLower(getOptText(jsonRequest, Parameter.MODE.str()));
                 param.clientContextID = getOptText(jsonRequest, Parameter.CLIENT_ID.str());
                 param.timeout = getOptText(jsonRequest, Parameter.TIMEOUT.str());
+                param.maxResultReads = getOptText(jsonRequest, Parameter.MAX_RESULT_READS.str());
             } catch (JsonParseException | JsonMappingException e) {
                 // if the JSON parsing fails, the statement is empty and we get an empty statement error
                 GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, e.getMessage(), e);
@@ -397,6 +401,7 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
             param.mode = toLower(request.getParameter(Parameter.MODE.str()));
             param.clientContextID = request.getParameter(Parameter.CLIENT_ID.str());
             param.timeout = request.getParameter(Parameter.TIMEOUT.str());
+            param.maxResultReads = request.getParameter(Parameter.MAX_RESULT_READS.str());
         }
         return param;
     }
@@ -448,6 +453,10 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
 
         ResultDelivery delivery = parseResultDelivery(param.mode);
 
+        final ResultProperties resultProperties = param.maxResultReads == null ?
+                new ResultProperties(delivery) :
+                new ResultProperties(delivery, Long.parseLong(param.maxResultReads));
+
         String handleUrl = getHandleUrl(param.host, param.path, delivery);
         SessionOutput sessionOutput = createSessionOutput(param, handleUrl, httpWriter);
         SessionConfig sessionConfig = sessionOutput.config();
@@ -478,7 +487,7 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
                     "http://" + hostName + ":" + appCtx.getExternalProperties().getQueryWebInterfacePort());
             response.setHeader("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept");
             response.setStatus(execution.getHttpStatus());
-            executeStatement(statementsText, sessionOutput, delivery, stats, param, execution, optionalParams);
+            executeStatement(statementsText, sessionOutput, resultProperties, stats, param, execution, optionalParams);
             if (ResultDelivery.IMMEDIATE == delivery || ResultDelivery.DEFERRED == delivery) {
                 ResultUtil.printStatus(sessionOutput, execution.getResultStatus());
             }
@@ -502,9 +511,9 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
         }
     }
 
-    protected void executeStatement(String statementsText, SessionOutput sessionOutput, ResultDelivery delivery,
-            IStatementExecutor.Stats stats, RequestParameters param, RequestExecutionState execution,
-            Map<String, String> optionalParameters) throws Exception {
+    protected void executeStatement(String statementsText, SessionOutput sessionOutput,
+            ResultProperties resultProperties, IStatementExecutor.Stats stats, RequestParameters param,
+            RequestExecutionState execution, Map<String, String> optionalParameters) throws Exception {
         IClusterManagementWork.ClusterState clusterState =
                 ((ICcApplicationContext) appCtx).getClusterStateManager().getState();
         if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
@@ -518,8 +527,8 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
                 sessionOutput, compilationProvider, componentProvider);
         execution.start();
         final IRequestParameters requestParameters =
-                new org.apache.asterix.app.translator.RequestParameters(getHyracksDataset(), delivery, stats, null,
-                        param.clientContextID, optionalParameters);
+                new org.apache.asterix.app.translator.RequestParameters(getHyracksDataset(), resultProperties, stats,
+                        null, param.clientContextID, optionalParameters);
         translator.compileAndExecute(getHyracksClientConnection(), queryCtx, requestParameters);
         execution.end();
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index 3359b9f..360c522 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -42,6 +42,7 @@ import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionConfig.OutputFormat;
 import org.apache.asterix.translator.SessionConfig.PlanFormat;
@@ -209,7 +210,8 @@ public abstract class RestApiServlet extends AbstractServlet {
             IStatementExecutor translator = statementExecutorFactory.create(appCtx, aqlStatements, sessionOutput,
                     compilationProvider, componentProvider);
             final IRequestParameters requestParameters =
-                    new RequestParameters(hds, resultDelivery, new IStatementExecutor.Stats(), null, null, null);
+                    new RequestParameters(hds, new ResultProperties(resultDelivery), new IStatementExecutor.Stats(),
+                            null, null, null);
             translator.compileAndExecute(hcc, null, requestParameters);
         } catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
             response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
index 58a7f09..4ecd978 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
@@ -35,6 +35,7 @@ import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionConfig.OutputFormat;
 import org.apache.asterix.translator.SessionConfig.PlanFormat;
@@ -120,8 +121,8 @@ public class AsterixJavaClient {
         IStatementExecutor translator = statementExecutorFactory.create(appCtx, statements, output, compilationProvider,
                 storageComponentProvider);
         final IRequestParameters requestParameters =
-                new RequestParameters(null, IStatementExecutor.ResultDelivery.IMMEDIATE, new IStatementExecutor.Stats(),
-                        null, null, null);
+                new RequestParameters(null, new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
+                        new IStatementExecutor.Stats(), null, null, null);
         translator.compileAndExecute(hcc, null, requestParameters);
         writer.flush();
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index 0b8c34c..5b0eb97 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -46,6 +46,7 @@ import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -69,20 +70,20 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage
     private final ILangExtension.Language lang;
     private final String statementsText;
     private final SessionConfig sessionConfig;
-    private final IStatementExecutor.ResultDelivery delivery;
+    private final ResultProperties resultProperties;
     private final String clientContextID;
     private final String handleUrl;
     private final Map<String, String> optionalParameters;
 
     public ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang,
-            String statementsText, SessionConfig sessionConfig, IStatementExecutor.ResultDelivery delivery,
+            String statementsText, SessionConfig sessionConfig, ResultProperties resultProperties,
             String clientContextID, String handleUrl, Map<String, String> optionalParameters) {
         this.requestNodeId = requestNodeId;
         this.requestMessageId = requestMessageId;
         this.lang = lang;
         this.statementsText = statementsText;
         this.sessionConfig = sessionConfig;
-        this.delivery = delivery;
+        this.resultProperties = resultProperties;
         this.clientContextID = clientContextID;
         this.handleUrl = handleUrl;
         this.optionalParameters = optionalParameters;
@@ -122,7 +123,8 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage
                     compilationProvider, storageComponentProvider);
             final IStatementExecutor.Stats stats = new IStatementExecutor.Stats();
             final IRequestParameters requestParameters =
-                    new RequestParameters(null, delivery, stats, outMetadata, clientContextID, optionalParameters);
+                    new RequestParameters(null, resultProperties, stats, outMetadata, clientContextID,
+                            optionalParameters);
             translator.compileAndExecute(ccApp.getHcc(), statementExecutorContext, requestParameters);
             outPrinter.close();
             responseMsg.setResult(outWriter.toString());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 9b96883..4e9cb47 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -283,7 +283,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         Thread.currentThread().setName(QueryTranslator.class.getSimpleName());
         Map<String, String> config = new HashMap<>();
         final IHyracksDataset hdc = requestParameters.getHyracksDataset();
-        final ResultDelivery resultDelivery = requestParameters.getResultDelivery();
+        final ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery();
+        final long maxResultReads = requestParameters.getResultProperties().getMaxReads();
         final Stats stats = requestParameters.getStats();
         final ResultMetadata outMetadata = requestParameters.getOutMetadata();
         final String clientContextId = requestParameters.getClientContextId();
@@ -351,6 +352,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                             metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
                             metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC
                                     || resultDelivery == ResultDelivery.DEFERRED);
+                            metadataProvider.setMaxResultReads(maxResultReads);
                         }
                         handleInsertUpsertStatement(metadataProvider, stmt, hcc, hdc, resultDelivery, outMetadata,
                                 stats, false, clientContextId);
@@ -386,6 +388,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                         metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
                         metadataProvider.setResultAsyncMode(
                                 resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED);
+                        metadataProvider.setMaxResultReads(maxResultReads);
                         handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery, outMetadata, stats,
                                 clientContextId, ctx);
                         break;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
index 5b8da8b..9592492 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
@@ -22,24 +22,24 @@ import java.util.Map;
 
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.asterix.translator.ResultProperties;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 
 public class RequestParameters implements IRequestParameters {
 
     private final IHyracksDataset hdc;
-    private final ResultDelivery resultDelivery;
+    private final ResultProperties resultProperties;
     private final Stats stats;
     private final Map<String, String> optionalParameters;
     private final IStatementExecutor.ResultMetadata outMetadata;
     private final String clientContextId;
 
-    public RequestParameters(IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+    public RequestParameters(IHyracksDataset hdc, ResultProperties resultProperties, Stats stats,
             IStatementExecutor.ResultMetadata outMetadata, String clientContextId,
             Map<String, String> optionalParameters) {
         this.hdc = hdc;
-        this.resultDelivery = resultDelivery;
+        this.resultProperties = resultProperties;
         this.stats = stats;
         this.outMetadata = outMetadata;
         this.clientContextId = clientContextId;
@@ -52,8 +52,8 @@ public class RequestParameters implements IRequestParameters {
     }
 
     @Override
-    public IStatementExecutor.ResultDelivery getResultDelivery() {
-        return resultDelivery;
+    public ResultProperties getResultProperties() {
+        return resultProperties;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 30336d1..222e098 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -57,6 +57,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Stream;
 
+import org.apache.asterix.api.http.server.QueryServiceServlet;
 import org.apache.asterix.app.external.IExternalUDFLibrarian;
 import org.apache.asterix.common.api.Duration;
 import org.apache.asterix.common.config.GlobalConfig;
@@ -122,7 +123,7 @@ public class TestExecutor {
     private static final Pattern HTTP_PARAM_PATTERN = Pattern.compile("param (\\w+)=(.*)", Pattern.MULTILINE);
     private static final Pattern HTTP_BODY_PATTERN = Pattern.compile("body=(.*)", Pattern.MULTILINE);
     private static final Pattern HTTP_STATUSCODE_PATTERN = Pattern.compile("statuscode (.*)", Pattern.MULTILINE);
-
+    private static final Pattern MAX_RESULT_READS_PATTERN = Pattern.compile("maxresultreads=(\\d+)(\\D|$)", Pattern.MULTILINE);
     public static final int TRUNCATE_THRESHOLD = 16384;
 
     public static final String DELIVERY_ASYNC = "async";
@@ -556,7 +557,12 @@ public class TestExecutor {
 
     public InputStream executeQueryService(String str, OutputFormat fmt, URI uri, List<Parameter> params,
             boolean jsonEncoded, Predicate<Integer> responseCodeValidator, boolean cancellable) throws Exception {
-        final List<Parameter> newParams = upsertParam(params, "format", fmt.mimeType());
+        List<Parameter> newParams = upsertParam(params, "format", fmt.mimeType());
+        final Optional<String> maxReadsOptional = extractMaxResultReads(str);
+        if (maxReadsOptional.isPresent()) {
+            newParams = upsertParam(newParams, QueryServiceServlet.Parameter.MAX_RESULT_READS.str(),
+                    maxReadsOptional.get());
+        }
         HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, uri, "statement", newParams)
                 : constructPostMethodUrl(str, uri, "statement", newParams);
         // Set accepted output response type
@@ -1392,6 +1398,14 @@ public class TestExecutor {
         return tmpStmt;
     }
 
+    protected static Optional<String> extractMaxResultReads(String statement) {
+        final Matcher m = MAX_RESULT_READS_PATTERN.matcher(statement);
+        while (m.find()) {
+            return Optional.of(m.group(1));
+        }
+        return Optional.empty();
+    }
+
     protected static Optional<String> extractBody(String statement) {
         final Matcher m = HTTP_BODY_PATTERN.matcher(statement);
         while (m.find()) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
index 9a2ad3c..fe030a8 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
@@ -49,4 +49,10 @@
             <output-dir compare="Text">async-running</output-dir>
         </compilation-unit>
     </test-case>
+    <test-case FilePath="async-deferred">
+        <compilation-unit name="async-exhausted-result">
+            <output-dir compare="Text">async-exhausted-result</output-dir>
+            <expected-error>Job Failed</expected-error>
+        </compilation-unit>
+    </test-case>
 </test-group>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.1.async.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.1.async.sqlpp
new file mode 100644
index 0000000..f8ec2cf
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.1.async.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- maxresultreads=1
+-- handlevariable=status
+
+select i, i * i as i2 from range(1, 10) i;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.2.pollget.uri
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.2.pollget.uri b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.2.pollget.uri
new file mode 100644
index 0000000..bca879b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.2.pollget.uri
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- polltimeoutsecs=20
+-- handlevariable=result
+
+$status

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.3.get.uri
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.3.get.uri b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.3.get.uri
new file mode 100644
index 0000000..b613531
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.3.get.uri
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+$result

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.4.get.uri
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.4.get.uri b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.4.get.uri
new file mode 100644
index 0000000..b613531
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.4.get.uri
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+$result

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
index 1e18f66..8055915 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+-- maxresultreads=2
 -- handlevariable=status
 
 select i, i * i as i2 from range(1, 10) i;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.1.ignore
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.1.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.1.ignore
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.2.regex
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.2.regex
new file mode 100644
index 0000000..4308ba2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.2.regex
@@ -0,0 +1,2 @@
+/"status": "success"/
+/"handle": ".*"/

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.3.json
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.3.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.3.json
new file mode 100644
index 0000000..09e86cc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.3.json
@@ -0,0 +1,10 @@
+{ "i": 1, "i2": 1 }
+{ "i": 2, "i2": 4 }
+{ "i": 3, "i2": 9 }
+{ "i": 4, "i2": 16 }
+{ "i": 5, "i2": 25 }
+{ "i": 6, "i2": 36 }
+{ "i": 7, "i2": 49 }
+{ "i": 8, "i2": 64 }
+{ "i": 9, "i2": 81 }
+{ "i": 10, "i2": 100 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index b8790e5..6f58b0a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -153,6 +153,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
     private IAWriterFactory writerFactory;
     private FileSplit outputFile;
     private boolean asyncResults;
+    private long maxResultReads;
     private ResultSetId resultSetId;
     private IResultSerializerFactoryProvider resultSerializerFactoryProvider;
     private TxnId txnId;
@@ -238,6 +239,14 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         this.asyncResults = asyncResults;
     }
 
+    public void setMaxResultReads(long maxResultReads) {
+        this.maxResultReads = maxResultReads;
+    }
+
+    public long getMaxResultReads() {
+        return maxResultReads;
+    }
+
     public ResultSetId getResultSetId() {
         return resultSetId;
     }
@@ -536,7 +545,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             IResultSerializerFactory resultSerializedAppenderFactory = resultSerializerFactoryProvider
                     .getAqlResultSerializerFactoryProvider(printColumns, printerFactories, getWriterFactory());
             resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, ordered, getResultAsyncMode(),
-                    resultSerializedAppenderFactory);
+                    resultSerializedAppenderFactory, getMaxResultReads());
         } catch (IOException e) {
             throw new AlgebricksException(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
index 008f0be..e6cf6d3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -25,7 +25,7 @@ import org.apache.hyracks.api.job.JobId;
 
 public interface IDatasetPartitionManager extends IDatasetManager {
     IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
-            boolean asyncMode, int partition, int nPartitions) throws HyracksException;
+            boolean asyncMode, int partition, int nPartitions, long maxReads) throws HyracksException;
 
     void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
             boolean orderedResult, boolean emptyResult) throws HyracksException;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
index 60e2e35..56c4576 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
@@ -52,7 +52,8 @@ public class NetworkOutputChannel implements IFrameWriter {
     }
 
     @Override
-    public void open() throws HyracksDataException {
+    public void open() {
+        // no op
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index bc980e1..d381a67 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -70,12 +70,12 @@ public class DatasetPartitionManager implements IDatasetPartitionManager {
 
     @Override
     public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
-            boolean asyncMode, int partition, int nPartitions) throws HyracksException {
+            boolean asyncMode, int partition, int nPartitions, long maxReads) {
         DatasetPartitionWriter dpw;
         JobId jobId = ctx.getJobletContext().getJobId();
         synchronized (this) {
             dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, orderedResult, partition, nPartitions,
-                    datasetMemoryManager, fileFactory);
+                    datasetMemoryManager, fileFactory, maxReads);
 
             ResultSetMap rsIdMap = partitionResultStateMap.computeIfAbsent(jobId, k -> new ResultSetMap());
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
index ec33b05..24edeb2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
@@ -23,6 +23,7 @@ import java.util.concurrent.Executor;
 
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.partitions.ResultSetPartitionId;
 import org.apache.hyracks.comm.channels.NetworkOutputChannel;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -31,11 +32,8 @@ public class DatasetPartitionReader {
     private static final Logger LOGGER = LogManager.getLogger();
 
     private final DatasetPartitionManager datasetPartitionManager;
-
     private final DatasetMemoryManager datasetMemoryManager;
-
     private final Executor executor;
-
     private final ResultState resultState;
 
     public DatasetPartitionReader(DatasetPartitionManager datasetPartitionManager,
@@ -47,56 +45,67 @@ public class DatasetPartitionReader {
     }
 
     public void writeTo(final IFrameWriter writer) {
-        executor.execute(new Runnable() {
-            @Override
-            public void run() {
-                NetworkOutputChannel channel = (NetworkOutputChannel) writer;
-                channel.setFrameSize(resultState.getFrameSize());
-                try {
-                    resultState.readOpen();
-                    channel.open();
-                    try {
-                        long offset = 0;
-                        ByteBuffer buffer = ByteBuffer.allocate(resultState.getFrameSize());
-                        while (true) {
-                            buffer.clear();
-                            long size = read(offset, buffer);
-                            if (size <= 0) {
-                                break;
-                            } else if (size < buffer.limit()) {
-                                throw new HyracksDataException("Premature end of file - readSize: " + size
-                                        + " buffer limit: " + buffer.limit());
-                            }
-                            offset += size;
-                            buffer.flip();
-                            channel.nextFrame(buffer);
-                        }
-                        LOGGER.info("Result Reader read + " + offset + " bytes");
-                    } finally {
-                        channel.close();
-                        resultState.readClose();
-                        // If the query is a synchronous query, remove its partition as soon as it is read.
-                        if (!resultState.getAsyncMode()) {
-                            datasetPartitionManager.removePartition(resultState.getResultSetPartitionId().getJobId(),
-                                    resultState.getResultSetPartitionId().getResultSetId(), resultState
-                                            .getResultSetPartitionId().getPartition());
-                        }
+        executor.execute(new ResultPartitionSender((NetworkOutputChannel) writer));
+    }
+
+    private class ResultPartitionSender implements Runnable {
+
+        private final NetworkOutputChannel channel;
+
+        ResultPartitionSender(final NetworkOutputChannel channel) {
+            this.channel = channel;
+        }
+
+        @Override
+        public void run() {
+            channel.setFrameSize(resultState.getFrameSize());
+            channel.open();
+            try {
+                resultState.readOpen();
+                long offset = 0;
+                final ByteBuffer buffer = ByteBuffer.allocate(resultState.getFrameSize());
+                while (true) {
+                    buffer.clear();
+                    final long size = read(offset, buffer);
+                    if (size <= 0) {
+                        break;
+                    } else if (size < buffer.limit()) {
+                        throw new IllegalStateException(
+                                "Premature end of file - readSize: " + size + " buffer limit: " + buffer.limit());
                     }
-                } catch (HyracksDataException e) {
-                    throw new RuntimeException(e);
+                    offset += size;
+                    buffer.flip();
+                    channel.nextFrame(buffer);
                 }
-                if (LOGGER.isInfoEnabled()) {
-                    LOGGER.info("result reading successful(" + resultState.getResultSetPartitionId() + ")");
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("result reading successful(" + resultState.getResultSetPartitionId() + ")");
                 }
+            } catch (Exception e) {
+                LOGGER.error(() -> "failed to send result partition " + resultState.getResultSetPartitionId(), e);
+                channel.abort();
+            } finally {
+                close();
             }
+        }
+
+        private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
+            return datasetMemoryManager != null ?
+                    resultState.read(datasetMemoryManager, offset, buffer) :
+                    resultState.read(offset, buffer);
+        }
 
-            private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
-                if (datasetMemoryManager == null) {
-                    return resultState.read(offset, buffer);
-                } else {
-                    return resultState.read(datasetMemoryManager, offset, buffer);
+        private void close() {
+            try {
+                channel.close();
+                resultState.readClose();
+                if (resultState.isExhausted()) {
+                    final ResultSetPartitionId partitionId = resultState.getResultSetPartitionId();
+                    datasetPartitionManager.removePartition(partitionId.getJobId(), partitionId.getResultSetId(),
+                            partitionId.getPartition());
                 }
+            } catch (HyracksDataException e) {
+                LOGGER.error("unexpected failure in partition reader clean up", e);
             }
-        });
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 2bf5326..d49a1a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -59,7 +59,7 @@ public class DatasetPartitionWriter implements IFrameWriter {
 
     public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
             ResultSetId rsId, boolean asyncMode, boolean orderedResult, int partition, int nPartitions,
-            DatasetMemoryManager datasetMemoryManager, IWorkspaceFileFactory fileFactory) {
+            DatasetMemoryManager datasetMemoryManager, IWorkspaceFileFactory fileFactory, long maxReads) {
         this.manager = manager;
         this.jobId = jobId;
         this.resultSetId = rsId;
@@ -70,7 +70,7 @@ public class DatasetPartitionWriter implements IFrameWriter {
 
         resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, partition);
         resultState = new ResultState(resultSetPartitionId, asyncMode, ctx.getIoManager(), fileFactory,
-                ctx.getInitialFrameSize());
+                ctx.getInitialFrameSize(), maxReads);
     }
 
     public ResultState getResultState() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
index 43b1e9b..3e3f06b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
@@ -68,17 +68,22 @@ public class ResultState implements IStateObject {
     private long size;
 
     private long persistentSize;
+    private long remainingReads;
 
     ResultState(ResultSetPartitionId resultSetPartitionId, boolean asyncMode, IIOManager ioManager,
-            IWorkspaceFileFactory fileFactory, int frameSize) {
+            IWorkspaceFileFactory fileFactory, int frameSize, long maxReads) {
+        if (maxReads <= 0) {
+            throw new IllegalArgumentException("maxReads must be > 0");
+        }
         this.resultSetPartitionId = resultSetPartitionId;
         this.asyncMode = asyncMode;
         this.ioManager = ioManager;
         this.fileFactory = fileFactory;
         this.frameSize = frameSize;
+        remainingReads = maxReads;
         eos = new AtomicBoolean(false);
         failed = new AtomicBoolean(false);
-        localPageList = new ArrayList<Page>();
+        localPageList = new ArrayList<>();
 
         fileRef = null;
         writeFileHandle = null;
@@ -102,6 +107,7 @@ public class ResultState implements IStateObject {
         closeWriteFileHandle();
         if (fileRef != null) {
             fileRef.delete();
+            fileRef = null;
         }
     }
 
@@ -152,7 +158,10 @@ public class ResultState implements IStateObject {
     }
 
     public synchronized void readOpen() {
-        // It is a noOp for now, leaving here to keep the API stable for future usage.
+        if (isExhausted()) {
+            throw new IllegalStateException("Result reads exhausted");
+        }
+        remainingReads--;
     }
 
     public synchronized void readClose() throws HyracksDataException {
@@ -339,6 +348,7 @@ public class ResultState implements IStateObject {
             ObjectNode on = om.createObjectNode();
             on.put("rspid", resultSetPartitionId.toString());
             on.put("async", asyncMode);
+            on.put("remainingReads", remainingReads);
             on.put("eos", eos.get());
             on.put("failed", failed.get());
             on.put("fileRef", String.valueOf(fileRef));
@@ -347,4 +357,8 @@ public class ResultState implements IStateObject {
             return e.getMessage();
         }
     }
+
+    public synchronized boolean isExhausted() {
+        return remainingReads == 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 58eee79..d081bdb 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -51,14 +51,16 @@ public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperat
     private final boolean asyncMode;
 
     private final IResultSerializerFactory resultSerializerFactory;
+    private final long maxReads;
 
     public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, boolean ordered,
-            boolean asyncMode, IResultSerializerFactory resultSerializerFactory) throws IOException {
+            boolean asyncMode, IResultSerializerFactory resultSerializerFactory, long maxReads) throws IOException {
         super(spec, 1, 0);
         this.rsId = rsId;
         this.ordered = ordered;
         this.asyncMode = asyncMode;
         this.resultSerializerFactory = resultSerializerFactory;
+        this.maxReads = maxReads;
     }
 
     @Override
@@ -87,7 +89,7 @@ public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperat
             public void open() throws HyracksDataException {
                 try {
                     datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, ordered, asyncMode, partition,
-                            nPartitions);
+                            nPartitions, maxReads);
                     datasetPartitionWriter.open();
                     resultSerializer.init();
                 } catch (HyracksException e) {
@@ -139,7 +141,8 @@ public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperat
                 sb.append("{ ");
                 sb.append("\"rsId\": \"").append(rsId).append("\", ");
                 sb.append("\"ordered\": ").append(ordered).append(", ");
-                sb.append("\"asyncMode\": ").append(asyncMode).append(" }");
+                sb.append("\"asyncMode\": ").append(asyncMode).append(", ");
+                sb.append("\"maxReads\": ").append(maxReads).append(" }");
                 return sb.toString();
             }
         };

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
index f169054..080746c 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
@@ -100,7 +100,7 @@ public class AggregationTest extends AbstractIntegrationTest {
 
         ResultSetId rsId = new ResultSetId(1);
         AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         spec.addResultSetId(rsId);
 
         return printer;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
index b5c6238..c05b504 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
@@ -101,7 +101,7 @@ public class CountOfCountsTest extends AbstractIntegrationTest {
 
         ResultSetId rsId = new ResultSetId(1);
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         spec.addResultSetId(rsId);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID);
 
@@ -173,7 +173,7 @@ public class CountOfCountsTest extends AbstractIntegrationTest {
 
         ResultSetId rsId = new ResultSetId(1);
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         spec.addResultSetId(rsId);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -246,7 +246,7 @@ public class CountOfCountsTest extends AbstractIntegrationTest {
 
         ResultSetId rsId = new ResultSetId(1);
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         spec.addResultSetId(rsId);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
index 56bf853..b693b09 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
@@ -159,7 +159,7 @@ public class HeapSortMergeTest extends AbstractIntegrationTest {
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
index 67642f4..67845c0 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -229,7 +229,7 @@ public class LocalityAwareConnectorTest extends AbstractMultiNCIntegrationTest {
 
         ResultSetId rsId = new ResultSetId(1);
         AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         spec.addResultSetId(rsId);
 
         return printer;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
index b62c011..d7d4219 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
@@ -95,7 +95,7 @@ public class ReplicateOperatorTest extends AbstractIntegrationTest {
             spec.addResultSetId(rsId);
 
             outputOp[i] = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
-                    ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                    ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
             PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i], locations);
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
index dc91dd2..06d7b04 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
@@ -71,7 +71,7 @@ public class ScanPrintTest extends AbstractIntegrationTest {
 
         ResultSetId rsId = new ResultSetId(1);
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         spec.addResultSetId(rsId);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
@@ -107,7 +107,7 @@ public class ScanPrintTest extends AbstractIntegrationTest {
 
         ResultSetId rsId = new ResultSetId(1);
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         spec.addResultSetId(rsId);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID);
@@ -146,7 +146,7 @@ public class ScanPrintTest extends AbstractIntegrationTest {
 
         ResultSetId rsId = new ResultSetId(1);
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         spec.addResultSetId(rsId);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
index 3043cba..df9c0d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
@@ -85,7 +85,7 @@ public class SortMergeTest extends AbstractIntegrationTest {
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -135,7 +135,7 @@ public class SortMergeTest extends AbstractIntegrationTest {
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 7075fe9..2c055c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -132,7 +132,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec);
@@ -215,7 +215,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec);
@@ -300,7 +300,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec);
@@ -385,7 +385,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec);
@@ -471,7 +471,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
@@ -563,7 +563,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
@@ -654,7 +654,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
@@ -750,7 +750,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         IConnectorDescriptor ordPartConn = new MToNPartitioningConnectorDescriptor(spec,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/92f7cb58/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
index 03cd5d4..dc5d0bc 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -182,7 +182,7 @@ public class TPCHCustomerOrderNestedLoopJoinTest extends AbstractIntegrationTest
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec);
@@ -263,7 +263,7 @@ public class TPCHCustomerOrderNestedLoopJoinTest extends AbstractIntegrationTest
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -344,7 +344,7 @@ public class TPCHCustomerOrderNestedLoopJoinTest extends AbstractIntegrationTest
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -430,7 +430,7 @@ public class TPCHCustomerOrderNestedLoopJoinTest extends AbstractIntegrationTest
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);