You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2018/01/30 17:10:37 UTC

asterixdb git commit: [ASTERIXDB-1706][RT] Ensure All Result Frames Are Read

Repository: asterixdb
Updated Branches:
  refs/heads/master b2abe1e90 -> 85d4627e3


[ASTERIXDB-1706][RT] Ensure All Result Frames Are Read

- user model changes: no
- storage format changes: no
- interface changes: yes
  - Removed IDatasetInputChannelMonitor

Details:
- Currently there is a possibility that the EOS
  comes in DatasetInputChannelMonitor right after
  the check for avaibale frames is performed which
  will result in missing some result frames from
  being read. When this happens, empty result will
  be returned if no frames were read before. This
  change ensures that the state between checking
  the avaiable frames and the EOS is consistent.
- Clean up HyracksDatasetReader.
- Use error code for result read failure.

Change-Id: I7d5a78fa20fe200cfffd21a215e052481c6d61ca
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2337
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <ti...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/85d4627e
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/85d4627e
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/85d4627e

Branch: refs/heads/master
Commit: 85d4627e3c89125c55756153560d093c346b4451
Parents: b2abe1e
Author: Murtadha Hubail <mh...@apache.org>
Authored: Tue Jan 30 15:31:13 2018 +0300
Committer: Murtadha Hubail <mh...@apache.org>
Committed: Tue Jan 30 09:09:55 2018 -0800

----------------------------------------------------------------------
 .../async-deferred/AsyncDeferredQueries.xml     |   2 +-
 .../api/dataset/DatasetDirectoryRecord.java     |   2 +-
 .../dataset/IDatasetInputChannelMonitor.java    |  31 ---
 .../hyracks/api/exceptions/ErrorCode.java       |   2 +-
 .../src/main/resources/errormsg/en.properties   |   2 +-
 .../client/dataset/HyracksDatasetReader.java    | 258 +++++++++----------
 6 files changed, 121 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/85d4627e/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
index fe030a8..1ae94ec 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
@@ -52,7 +52,7 @@
     <test-case FilePath="async-deferred">
         <compilation-unit name="async-exhausted-result">
             <output-dir compare="Text">async-exhausted-result</output-dir>
-            <expected-error>Job Failed</expected-error>
+            <expected-error>HYR0093</expected-error>
         </compilation-unit>
     </test-case>
 </test-group>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/85d4627e/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
index 3165840..e47b1e2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
@@ -58,7 +58,7 @@ public class DatasetDirectoryRecord implements Serializable {
         this.empty = empty;
     }
 
-    public boolean getEmpty() {
+    public boolean isEmpty() {
         return empty;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/85d4627e/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetInputChannelMonitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetInputChannelMonitor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetInputChannelMonitor.java
deleted file mode 100644
index bce321f..0000000
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetInputChannelMonitor.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.api.dataset;
-
-import org.apache.hyracks.api.channels.IInputChannelMonitor;
-
-public interface IDatasetInputChannelMonitor extends IInputChannelMonitor {
-    public boolean eosReached();
-
-    public boolean failed();
-
-    public int getNFramesAvailable();
-
-    public void notifyFrameRead();
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/85d4627e/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 35fdb2e..8b47171 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -126,7 +126,7 @@ public class ErrorCode {
     public static final int ILLEGAL_MEMORY_BUDGET = 90;
     public static final int TIMEOUT = 91;
     public static final int JOB_HAS_BEEN_CLEARED_FROM_HISTORY = 92;
-    // 93
+    public static final int FAILED_TO_READ_RESULT = 93;
     public static final int CANNOT_READ_CLOSED_FILE = 94;
     public static final int TUPLE_CANNOT_FIT_INTO_EMPTY_FRAME = 95;
     public static final int ILLEGAL_ATTEMPT_TO_ENTER_EMPTY_COMPONENT = 96;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/85d4627e/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 465a661..a27a736 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -109,7 +109,7 @@
 90 = Memory budget for the %1$s operator (%2$s bytes) is lower than the minimum (%3$s bytes)
 91 = Operation timed out
 92 = Job %1$s has been cleared from job history
-# 93
+93 = Failed to read result for job %1$s
 94 = Cannot read closed file (%1$s)
 95 = Tuple of size %1$s cannot fit into an empty frame
 96 = Illegal attempt to enter empty component

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/85d4627e/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
index e7c9042..36c77ce 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
@@ -23,75 +23,59 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hyracks.api.channels.IInputChannel;
+import org.apache.hyracks.api.channels.IInputChannelMonitor;
 import org.apache.hyracks.api.comm.FrameHelper;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.context.IHyracksCommonContext;
 import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
 import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
-import org.apache.hyracks.api.dataset.IDatasetInputChannelMonitor;
 import org.apache.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
 import org.apache.hyracks.api.dataset.IHyracksDatasetReader;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.client.net.ClientNetworkManager;
 import org.apache.hyracks.comm.channels.DatasetNetworkInputChannel;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-// TODO(madhusudancs): Should this implementation be moved to org.apache.hyracks.client?
+@NotThreadSafe
 public class HyracksDatasetReader implements IHyracksDatasetReader {
-    private static final Logger LOGGER = LogManager.getLogger();
-
-    private final IHyracksDatasetDirectoryServiceConnection datasetDirectoryServiceConnection;
 
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final int NUM_READ_BUFFERS = 1;
+    private final IHyracksDatasetDirectoryServiceConnection datasetDirectory;
     private final ClientNetworkManager netManager;
-
     private final IHyracksCommonContext datasetClientCtx;
-
-    private JobId jobId;
-
-    private ResultSetId resultSetId;
-
+    private final JobId jobId;
+    private final ResultSetId resultSetId;
     private DatasetDirectoryRecord[] knownRecords;
+    private DatasetInputChannelMonitor[] monitors;
+    private DatasetInputChannelMonitor currentRecordMonitor;
+    private DatasetNetworkInputChannel currentRecordChannel;
+    private int currentRecord;
 
-    private IDatasetInputChannelMonitor[] monitors;
-
-    private int lastReadPartition;
-
-    private IDatasetInputChannelMonitor lastMonitor;
-
-    private DatasetNetworkInputChannel resultChannel;
-
-    private static int NUM_READ_BUFFERS = 1;
-
-    public HyracksDatasetReader(IHyracksDatasetDirectoryServiceConnection datasetDirectoryServiceConnection,
+    public HyracksDatasetReader(IHyracksDatasetDirectoryServiceConnection datasetDirectory,
             ClientNetworkManager netManager, IHyracksCommonContext datasetClientCtx, JobId jobId,
-            ResultSetId resultSetId) throws Exception {
-        this.datasetDirectoryServiceConnection = datasetDirectoryServiceConnection;
+            ResultSetId resultSetId) {
+        this.datasetDirectory = datasetDirectory;
         this.netManager = netManager;
         this.datasetClientCtx = datasetClientCtx;
         this.jobId = jobId;
         this.resultSetId = resultSetId;
-        knownRecords = null;
-        monitors = null;
-        lastReadPartition = -1;
-        lastMonitor = null;
-        resultChannel = null;
+        currentRecord = -1;
     }
 
     @Override
     public Status getResultStatus() {
         try {
-            return datasetDirectoryServiceConnection.getDatasetResultStatus(jobId, resultSetId);
+            return datasetDirectory.getDatasetResultStatus(jobId, resultSetId);
         } catch (HyracksDataException e) {
             if (e.getErrorCode() != ErrorCode.NO_RESULT_SET) {
                 LOGGER.log(Level.WARN, "Exception retrieving result set for job " + jobId, e);
@@ -102,107 +86,55 @@ public class HyracksDatasetReader implements IHyracksDatasetReader {
         return null;
     }
 
-    private DatasetDirectoryRecord getRecord(int partition) throws Exception {
-        while (knownRecords == null || knownRecords[partition] == null) {
-            knownRecords =
-                    datasetDirectoryServiceConnection.getDatasetResultLocations(jobId, resultSetId, knownRecords);
-        }
-        return knownRecords[partition];
-    }
-
-    private boolean nextPartition() throws HyracksDataException {
-        ++lastReadPartition;
-        try {
-            DatasetDirectoryRecord record = getRecord(lastReadPartition);
-            while (record.getEmpty() && (++lastReadPartition) < knownRecords.length) {
-                record = getRecord(lastReadPartition);
-            }
-            if (lastReadPartition == knownRecords.length) {
-                return false;
-            }
-            resultChannel = new DatasetNetworkInputChannel(netManager, getSocketAddress(record), jobId, resultSetId,
-                    lastReadPartition, NUM_READ_BUFFERS);
-            lastMonitor = getMonitor(lastReadPartition);
-            resultChannel.registerMonitor(lastMonitor);
-            resultChannel.open(datasetClientCtx);
-            return true;
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
-    }
-
     @Override
     public int read(IFrame frame) throws HyracksDataException {
         frame.reset();
-        ByteBuffer readBuffer;
         int readSize = 0;
-
-        if (lastReadPartition == -1) {
-            if (!nextPartition()) {
-                return readSize;
-            }
-        }
-
-        while (readSize < frame.getFrameSize()
-                && !((lastReadPartition == knownRecords.length - 1) && isPartitionReadComplete(lastMonitor))) {
-            waitForNextFrame(lastMonitor);
-            if (isPartitionReadComplete(lastMonitor)) {
-                knownRecords[lastReadPartition].readEOS();
-                resultChannel.close();
-                if ((lastReadPartition == knownRecords.length - 1) || !nextPartition()) {
-                    break;
+        if (isFirstRead() && !hasNextRecord()) {
+            return readSize;
+        }
+        // read until frame is full or all dataset records have been read
+        while (readSize < frame.getFrameSize()) {
+            if (currentRecordMonitor.hasMoreFrames()) {
+                final ByteBuffer readBuffer = currentRecordChannel.getNextBuffer();
+                if (readBuffer == null) {
+                    throw new IllegalStateException("Unexpected empty frame");
+                }
+                currentRecordMonitor.notifyFrameRead();
+                if (readSize == 0) {
+                    final int nBlocks = FrameHelper.deserializeNumOfMinFrame(readBuffer);
+                    frame.ensureFrameSize(frame.getMinSize() * nBlocks);
+                    frame.getBuffer().clear();
                 }
+                frame.getBuffer().put(readBuffer);
+                currentRecordChannel.recycleBuffer(readBuffer);
+                readSize = frame.getBuffer().position();
             } else {
-                readBuffer = resultChannel.getNextBuffer();
-                lastMonitor.notifyFrameRead();
-                if (readBuffer != null) {
-                    if (readSize <= 0) {
-                        int nBlocks = FrameHelper.deserializeNumOfMinFrame(readBuffer);
-                        frame.ensureFrameSize(frame.getMinSize() * nBlocks);
-                        frame.getBuffer().clear();
-                        frame.getBuffer().put(readBuffer);
-                        resultChannel.recycleBuffer(readBuffer);
-                        readSize = frame.getBuffer().position();
-                    } else {
-                        frame.getBuffer().put(readBuffer);
-                        resultChannel.recycleBuffer(readBuffer);
-                        readSize = frame.getBuffer().position();
-                    }
+                currentRecordChannel.close();
+                if (currentRecordMonitor.failed()) {
+                    throw HyracksDataException.create(ErrorCode.FAILED_TO_READ_RESULT, jobId);
+                }
+                if (isLastRecord() || !hasNextRecord()) {
+                    break;
                 }
             }
         }
-
         frame.getBuffer().flip();
         return readSize;
     }
 
-    private static void waitForNextFrame(IDatasetInputChannelMonitor monitor) throws HyracksDataException {
-        synchronized (monitor) {
-            while (monitor.getNFramesAvailable() <= 0 && !monitor.eosReached() && !monitor.failed()) {
-                try {
-                    monitor.wait();
-                } catch (InterruptedException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-        }
-        if (monitor.failed()) {
-            throw new HyracksDataException("Job Failed.");
+    private SocketAddress getSocketAddress(DatasetDirectoryRecord record) throws HyracksDataException {
+        try {
+            final NetworkAddress netAddr = record.getNetworkAddress();
+            return new InetSocketAddress(InetAddress.getByAddress(netAddr.lookupIpAddress()), netAddr.getPort());
+        } catch (UnknownHostException e) {
+            throw HyracksDataException.create(e);
         }
     }
 
-    private boolean isPartitionReadComplete(IDatasetInputChannelMonitor monitor) {
-        return (monitor.getNFramesAvailable() <= 0) && (monitor.eosReached());
-    }
-
-    private SocketAddress getSocketAddress(DatasetDirectoryRecord addr) throws UnknownHostException {
-        NetworkAddress netAddr = addr.getNetworkAddress();
-        return new InetSocketAddress(InetAddress.getByAddress(netAddr.lookupIpAddress()), netAddr.getPort());
-    }
-
-    private IDatasetInputChannelMonitor getMonitor(int partition) throws HyracksException {
+    private DatasetInputChannelMonitor getMonitor(int partition) {
         if (knownRecords == null || knownRecords[partition] == null) {
-            throw new HyracksException("Accessing monitors before the obtaining the corresponding addresses.");
+            throw new IllegalStateException("Accessing monitors before obtaining the corresponding addresses");
         }
         if (monitors == null) {
             monitors = new DatasetInputChannelMonitor[knownRecords.length];
@@ -213,56 +145,100 @@ public class HyracksDatasetReader implements IHyracksDatasetReader {
         return monitors[partition];
     }
 
-    private class DatasetInputChannelMonitor implements IDatasetInputChannelMonitor {
-        private final AtomicInteger nAvailableFrames;
+    private boolean hasNextRecord() throws HyracksDataException {
+        currentRecord++;
+        DatasetDirectoryRecord record = getRecord(currentRecord);
+        // skip empty records
+        while (record.isEmpty() && ++currentRecord < knownRecords.length) {
+            record = getRecord(currentRecord);
+        }
+        if (currentRecord == knownRecords.length) {
+            // exhausted all known records
+            return false;
+        }
+        requestRecordData(record);
+        return true;
+    }
+
+    private DatasetDirectoryRecord getRecord(int recordNum) throws HyracksDataException {
+        try {
+            while (knownRecords == null || knownRecords[recordNum] == null) {
+                knownRecords = datasetDirectory.getDatasetResultLocations(jobId, resultSetId, knownRecords);
+            }
+            return knownRecords[recordNum];
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private void requestRecordData(DatasetDirectoryRecord record) throws HyracksDataException {
+        currentRecordChannel = new DatasetNetworkInputChannel(netManager, getSocketAddress(record), jobId, resultSetId,
+                currentRecord, NUM_READ_BUFFERS);
+        currentRecordMonitor = getMonitor(currentRecord);
+        currentRecordChannel.registerMonitor(currentRecordMonitor);
+        currentRecordChannel.open(datasetClientCtx);
+    }
 
-        private final AtomicBoolean eos;
+    private boolean isFirstRead() {
+        return currentRecord == -1;
+    }
 
-        private final AtomicBoolean failed;
+    private boolean isLastRecord() {
+        return knownRecords != null && currentRecord == knownRecords.length - 1;
+    }
 
-        public DatasetInputChannelMonitor() {
-            nAvailableFrames = new AtomicInteger(0);
-            eos = new AtomicBoolean(false);
-            failed = new AtomicBoolean(false);
+    private static class DatasetInputChannelMonitor implements IInputChannelMonitor {
+
+        private int availableFrames;
+        private boolean eos;
+        private boolean failed;
+
+        DatasetInputChannelMonitor() {
+            eos = false;
+            failed = false;
         }
 
         @Override
         public synchronized void notifyFailure(IInputChannel channel) {
-            failed.set(true);
+            failed = true;
             notifyAll();
         }
 
         @Override
         public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
-            nAvailableFrames.addAndGet(nFrames);
+            availableFrames += nFrames;
             notifyAll();
         }
 
         @Override
         public synchronized void notifyEndOfStream(IInputChannel channel) {
-            eos.set(true);
+            eos = true;
             notifyAll();
         }
 
-        @Override
-        public synchronized boolean eosReached() {
-            return eos.get();
+        synchronized boolean failed() {
+            return failed;
         }
 
-        @Override
-        public synchronized boolean failed() {
-            return failed.get();
+        synchronized void notifyFrameRead() {
+            availableFrames--;
+            notifyAll();
         }
 
-        @Override
-        public synchronized int getNFramesAvailable() {
-            return nAvailableFrames.get();
+        synchronized boolean hasMoreFrames() throws HyracksDataException {
+            while (!failed && !eos && availableFrames == 0) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw HyracksDataException.create(e);
+                }
+            }
+            return !failed && !isFullyConsumed();
         }
 
-        @Override
-        public synchronized void notifyFrameRead() {
-            nAvailableFrames.decrementAndGet();
+        private synchronized boolean isFullyConsumed() {
+            return availableFrames == 0 && eos;
         }
-
     }
 }