You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2018/05/28 01:52:13 UTC

asterixdb git commit: [NO ISSUE][RT] Make start and cancel job uninterruptible

Repository: asterixdb
Updated Branches:
  refs/heads/master 8eb7413b7 -> 1102ed587


[NO ISSUE][RT] Make start and cancel job uninterruptible

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Previously, a Hyracks data connection that starts
  a job might be interrupted before it receives the
  job id. This would leak resources since that job
  will run even though, no one is going to read its
  result.
- Similarly, job cancellation can be interrupted and
  so the job which was meant to be cancelled will
  continue running.
- To avoid this, a new thread is added to Hyracks
  Client Connection which takes care of starting
  and cancelling of jobs. The thread submitting these
  requests will be un-interruptible until those calls
  return.

Change-Id: I27b2aaae902b19829bd2df2ae04c5e704f5ca8e8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2639
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <ba...@gmail.com>
Reviewed-by: Murtadha Hubail <mh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/1102ed58
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/1102ed58
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/1102ed58

Branch: refs/heads/master
Commit: 1102ed587d81b88bd2c6c8e012ccda5be47c2350
Parents: 8eb7413
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Fri May 25 00:21:43 2018 +0300
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Sun May 27 18:51:45 2018 -0700

----------------------------------------------------------------------
 .../external/indexing/IndexingScheduler.java    |   9 +-
 .../apache/asterix/external/util/HDFSUtils.java |   7 +-
 .../hyracks/api/client/HyracksConnection.java   | 196 ++++++++++++++++++-
 .../java/org/apache/hyracks/util/ExitUtil.java  |   1 +
 4 files changed, 197 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1102ed58/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
index edac0fa..f22693a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
@@ -33,8 +33,6 @@ import java.util.Random;
 
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
@@ -53,19 +51,14 @@ public class IndexingScheduler {
     /** a map from the NC name to the index */
     private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
 
-    /** a map from NC name to the NodeControllerInfo */
-    private Map<String, NodeControllerInfo> ncNameToNcInfos;
-
     /**
      * The constructor of the scheduler.
      *
      * @param ncNameToNcInfos
      * @throws HyracksException
      */
-    public IndexingScheduler(String ipAddress, int port) throws HyracksException {
+    public IndexingScheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
         try {
-            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
-            this.ncNameToNcInfos = hcc.getNodeControllerInfos();
             loadIPAddressToNCMap(ncNameToNcInfos);
         } catch (Exception e) {
             throw HyracksException.create(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1102ed58/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index bd50352..1b13ec5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.context.ICCContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
@@ -67,11 +68,11 @@ public class HDFSUtils {
 
     public static IndexingScheduler initializeIndexingHDFSScheduler(ICCServiceContext serviceCtx)
             throws HyracksDataException {
-        ICCContext ccContext = serviceCtx.getCCContext();
         IndexingScheduler scheduler = null;
         try {
-            scheduler = new IndexingScheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
-                    ccContext.getClusterControllerInfo().getClientNetPort());
+            ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext();
+            IHyracksClientConnection hcc = appCtx.getHcc();
+            scheduler = new IndexingScheduler(hcc.getNodeControllerInfos());
         } catch (HyracksException e) {
             throw new RuntimeDataException(ErrorCode.UTIL_HDFS_UTILS_CANNOT_OBTAIN_HDFS_SCHEDULER);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1102ed58/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index f635d94..cfa6f78 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -26,6 +26,11 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.http.HttpResponse;
 import org.apache.http.client.methods.HttpPut;
@@ -44,10 +49,16 @@ import org.apache.hyracks.api.job.JobInfo;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.topology.ClusterTopology;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.ipc.api.RPCInterface;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+import org.apache.hyracks.util.ExitUtil;
+import org.apache.hyracks.util.InterruptibleAction;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 /**
  * Connection Class used by a Hyracks Client to interact with a Hyracks Cluster
@@ -56,6 +67,9 @@ import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeseri
  * @author vinayakb
  */
 public final class HyracksConnection implements IHyracksClientConnection {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
     private final String ccHost;
 
     private final int ccPort;
@@ -66,6 +80,15 @@ public final class HyracksConnection implements IHyracksClientConnection {
 
     private final ClusterControllerInfo ccInfo;
 
+    private volatile boolean running = false;
+
+    private volatile long reqId = 0L;
+
+    private final ExecutorService uninterruptibleExecutor = Executors.newFixedThreadPool(2,
+            r -> new Thread(r, "HyracksConnection Uninterrubtible thread: " + r.getClass().getSimpleName()));
+
+    private final BlockingQueue<UnInterruptibleRequest<?>> uninterruptibles = new ArrayBlockingQueue<>(1);
+
     /**
      * Constructor to create a connection to the Hyracks Cluster Controller.
      *
@@ -86,6 +109,8 @@ public final class HyracksConnection implements IHyracksClientConnection {
         hci = new HyracksClientInterfaceRemoteProxy(ipc.getReconnectingHandle(new InetSocketAddress(ccHost, ccPort)),
                 rpci);
         ccInfo = hci.getClusterControllerInfo();
+        uninterruptibleExecutor.execute(new UninterrubtileRequestHandler());
+        uninterruptibleExecutor.execute(new UninterrubtileHandlerWatcher());
     }
 
     @Override
@@ -95,7 +120,8 @@ public final class HyracksConnection implements IHyracksClientConnection {
 
     @Override
     public void cancelJob(JobId jobId) throws Exception {
-        hci.cancelJob(jobId);
+        CancelJobRequest request = new CancelJobRequest(jobId);
+        uninterruptiblySubmitAndExecute(request);
     }
 
     @Override
@@ -131,12 +157,13 @@ public final class HyracksConnection implements IHyracksClientConnection {
 
     @Override
     public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception {
-        return hci.startJob(deployedJobSpecId, jobParameters);
+        StartDeployedJobRequest request = new StartDeployedJobRequest(deployedJobSpecId, jobParameters);
+        return interruptiblySubmitAndExecute(request);
     }
 
     @Override
     public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception {
-        return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags);
+        return startJob(null, acggf, jobFlags);
     }
 
     public DeployedJobSpecId deployJobSpec(IActivityClusterGraphGeneratorFactory acggf) throws Exception {
@@ -154,7 +181,7 @@ public final class HyracksConnection implements IHyracksClientConnection {
             hci.waitForCompletion(jobId);
         } catch (InterruptedException e) {
             // Cancels an on-going job if the current thread gets interrupted.
-            hci.cancelJob(jobId);
+            cancelJob(jobId);
             throw e;
         }
     }
@@ -232,7 +259,8 @@ public final class HyracksConnection implements IHyracksClientConnection {
     @Override
     public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf,
             EnumSet<JobFlag> jobFlags) throws Exception {
-        return hci.startJob(deploymentId, JavaSerializationUtils.serialize(acggf), jobFlags);
+        StartJobRequest request = new StartJobRequest(deploymentId, acggf, jobFlags);
+        return interruptiblySubmitAndExecute(request);
     }
 
     @Override
@@ -269,4 +297,162 @@ public final class HyracksConnection implements IHyracksClientConnection {
     public boolean isConnected() {
         return hci.isConnected();
     }
+
+    private <T> T uninterruptiblySubmitAndExecute(UnInterruptibleRequest<T> request) throws Exception {
+        InvokeUtil.doUninterruptibly(() -> uninterruptibles.put(request));
+        return uninterruptiblyExecute(request);
+    }
+
+    private <T> T uninterruptiblyExecute(UnInterruptibleRequest<T> request) throws Exception {
+        InvokeUtil.doUninterruptibly(request);
+        return request.result();
+    }
+
+    private <T> T interruptiblySubmitAndExecute(UnInterruptibleRequest<T> request) throws Exception {
+        uninterruptibles.put(request);
+        return uninterruptiblyExecute(request);
+    }
+
+    private abstract class UnInterruptibleRequest<T> implements InterruptibleAction {
+        boolean completed = false;
+        boolean failed = false;
+        Throwable failure = null;
+        T response = null;
+
+        @SuppressWarnings("squid:S1181")
+        private final void handle() {
+            try {
+                response = doHandle();
+            } catch (Throwable th) {
+                failed = true;
+                failure = th;
+            } finally {
+                synchronized (this) {
+                    completed = true;
+                    notifyAll();
+                }
+            }
+        }
+
+        protected abstract T doHandle() throws Exception;
+
+        @Override
+        public final synchronized void run() throws InterruptedException {
+            while (!completed) {
+                wait();
+            }
+        }
+
+        public T result() throws Exception {
+            if (failed) {
+                if (failure instanceof Error) {
+                    throw (Error) failure;
+                }
+                throw (Exception) failure;
+            }
+            return response;
+        }
+    }
+
+    private class CancelJobRequest extends UnInterruptibleRequest<Void> {
+        final JobId jobId;
+
+        public CancelJobRequest(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        protected Void doHandle() throws Exception {
+            hci.cancelJob(jobId);
+            return null;
+        }
+
+    }
+
+    private class StartDeployedJobRequest extends UnInterruptibleRequest<JobId> {
+
+        private final DeployedJobSpecId deployedJobSpecId;
+        private final Map<byte[], byte[]> jobParameters;
+
+        public StartDeployedJobRequest(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) {
+            this.deployedJobSpecId = deployedJobSpecId;
+            this.jobParameters = jobParameters;
+        }
+
+        @Override
+        protected JobId doHandle() throws Exception {
+            return hci.startJob(deployedJobSpecId, jobParameters);
+        }
+
+    }
+
+    private class StartJobRequest extends UnInterruptibleRequest<JobId> {
+        private final DeploymentId deploymentId;
+        private final IActivityClusterGraphGeneratorFactory acggf;
+        private final EnumSet<JobFlag> jobFlags;
+
+        public StartJobRequest(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf,
+                EnumSet<JobFlag> jobFlags) {
+            this.deploymentId = deploymentId;
+            this.acggf = acggf;
+            this.jobFlags = jobFlags;
+        }
+
+        @Override
+        protected JobId doHandle() throws Exception {
+            if (deploymentId == null) {
+                return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags);
+            } else {
+                return hci.startJob(deploymentId, JavaSerializationUtils.serialize(acggf), jobFlags);
+            }
+        }
+
+    }
+
+    private class UninterrubtileRequestHandler implements Runnable {
+        @SuppressWarnings({ "squid:S2189", "squid:S2142" })
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    UnInterruptibleRequest<?> next = uninterruptibles.take();
+                    reqId++;
+                    running = true;
+                    next.handle();
+                } catch (InterruptedException e) {
+                    LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted.");
+                    continue;
+                } finally {
+                    running = false;
+                }
+            }
+        }
+    }
+
+    public class UninterrubtileHandlerWatcher implements Runnable {
+        @Override
+        @SuppressWarnings({ "squid:S2189", "squid:S2142" })
+        public void run() {
+            long currentReqId = 0L;
+            long currentTime = System.nanoTime();
+            while (true) {
+                try {
+                    TimeUnit.MINUTES.sleep(1);
+                } catch (InterruptedException e) {
+                    LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted.");
+                    continue;
+                }
+                if (running) {
+                    if (reqId == currentReqId) {
+                        if (TimeUnit.NANOSECONDS.toMinutes(System.nanoTime() - currentTime) > 0) {
+                            ExitUtil.halt(ExitUtil.EC_FAILED_TO_PROCESS_UN_INTERRUPTIBLE_REQUEST);
+                        }
+                    } else {
+                        currentReqId = reqId;
+                        currentTime = System.nanoTime();
+                    }
+                }
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1102ed58/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index 4aa123b..c8b9112 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -34,6 +34,7 @@ public class ExitUtil {
     public static final int EC_FAILED_TO_STARTUP = 2;
     public static final int EC_FAILED_TO_RECOVER = 3;
     public static final int NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS = 4;
+    public static final int EC_FAILED_TO_PROCESS_UN_INTERRUPTIBLE_REQUEST = 5;
     public static final int EC_UNHANDLED_EXCEPTION = 11;
     public static final int EC_IMMEDIATE_HALT = 33;
     public static final int EC_HALT_ABNORMAL_RESERVED_44 = 44;