You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2018/03/23 15:10:03 UTC

[1/4] asterixdb git commit: [NO ISSUE][RT] Ensure all NC tasks are aborted before joining

Repository: asterixdb
Updated Branches:
  refs/heads/master 2ebd0d4f0 -> ce6755422


[NO ISSUE][RT] Ensure all NC tasks are aborted before joining

- user model changes: no
- storage format changes: no
- interface changes: yes
  - Add getApplication() to NodeControllerService

details:
- This change ensures that all previous tasks
  of a CC on an NC are completed before completion of
  registration.

Change-Id: I0517e5a390d50e8703ffdbecbb84467c22edda85
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2507
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/a77ec691
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/a77ec691
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/a77ec691

Branch: refs/heads/master
Commit: a77ec691d466fab8454b883d7e004074c6c9cf84
Parents: e6587f6
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Wed Mar 21 14:26:48 2018 -0700
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Wed Mar 21 18:24:50 2018 -0700

----------------------------------------------------------------------
 .../hyracks/bootstrap/NCApplication.java        |   1 +
 .../hyracks/control/cc/cluster/NodeManager.java |  15 ++-
 .../control/nc/NodeControllerService.java       |  19 ++--
 .../org/apache/hyracks/control/nc/Task.java     |  17 +++-
 .../control/nc/work/AbortAllJobsWork.java       |  10 +-
 .../nc/work/EnsureAllCcTasksCompleted.java      | 102 +++++++++++++++++++
 .../java/org/apache/hyracks/util/ExitUtil.java  |   1 +
 .../main/java/org/apache/hyracks/util/Span.java |  12 ++-
 8 files changed, 150 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a77ec691/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 494198b..57d080e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -211,6 +211,7 @@ public class NCApplication extends BaseNCApplication {
                 ? getCurrentSystemState() : SystemState.HEALTHY;
         RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
                 currentStatus, systemState);
+        ncs.notifyRegistrationCompleted(ccId);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a77ec691/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index e1a36cd..b44a6bb 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -103,14 +103,13 @@ public class NodeManager implements INodeManager {
         if (nodeRegistry.containsKey(nodeId)) {
             LOGGER.warn("Node with name " + nodeId + " has already registered; failing the node then re-registering.");
             failNode(nodeId);
-        } else {
-            try {
-                // TODO(mblow): it seems we should close IPC handles when we're done with them (like here)
-                IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(ncState.getNodeController().getAddress());
-                ncIPCHandle.send(-1, new AbortCCJobsFunction(ccConfig.getCcId()), null);
-            } catch (IPCException e) {
-                throw HyracksDataException.create(e);
-            }
+        }
+        try {
+            // TODO(mblow): it seems we should close IPC handles when we're done with them (like here)
+            IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(ncState.getNodeController().getAddress());
+            ncIPCHandle.send(-1, new AbortCCJobsFunction(ccConfig.getCcId()), null);
+        } catch (IPCException e) {
+            throw HyracksDataException.create(e);
         }
         LOGGER.warn("adding node to registry");
         nodeRegistry.put(nodeId, ncState);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a77ec691/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 98f5c70..6d54843 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -363,8 +363,7 @@ public class NodeControllerService implements IControllerService {
             };
             ClusterControllerRemoteProxy ccProxy = new ClusterControllerRemoteProxy(
                     ipc.getHandle(ccAddress, ncConfig.getClusterConnectRetries(), 1, ipcEventListener));
-            CcConnection ccc = new CcConnection(ccProxy);
-            return registerNode(ccc, ccAddress);
+            return registerNode(new CcConnection(ccProxy), ccAddress);
         }
     }
 
@@ -415,7 +414,6 @@ public class NodeControllerService implements IControllerService {
 
     public CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) throws Exception {
         LOGGER.info("Registering with Cluster Controller {}", ccc);
-
         int registrationId = nextRegistrationId.incrementAndGet();
         pendingRegistrations.put(registrationId, ccc);
         CcId ccId = ccc.registerNode(nodeRegistration, registrationId);
@@ -425,10 +423,8 @@ public class NodeControllerService implements IControllerService {
         if (distributedState != null) {
             getDistributedState().put(ccId, distributedState);
         }
-        application.onRegisterNode(ccId);
         IClusterController ccs = ccc.getClusterControllerService();
         NodeParameters nodeParameters = ccc.getNodeParameters();
-
         // Start heartbeat generator.
         if (!heartbeatThreads.containsKey(ccId)) {
             Thread heartbeatThread = new Thread(
@@ -445,8 +441,6 @@ public class NodeControllerService implements IControllerService {
             ccTimer.schedule(new ProfileDumpTask(ccs, ccId), 0, nodeParameters.getProfileDumpPeriod());
             ccTimers.put(ccId, ccTimer);
         }
-        ccc.notifyRegistrationCompleted();
-        LOGGER.info("Registering with Cluster Controller {} complete", ccc);
         return ccId;
     }
 
@@ -708,7 +702,6 @@ public class NodeControllerService implements IControllerService {
     }
 
     private class TraceCurrentTimeTask extends TimerTask {
-
         private ITracer tracer;
         private long traceCategory;
 
@@ -726,4 +719,14 @@ public class NodeControllerService implements IControllerService {
             }
         }
     }
+
+    public INCApplication getApplication() {
+        return application;
+    }
+
+    public void notifyRegistrationCompleted(CcId ccId) {
+        CcConnection ccc = getCcConnection(ccId);
+        ccc.notifyRegistrationCompleted();
+        LOGGER.info("Registering with Cluster Controller {} complete", ccc);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a77ec691/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 9d99968..d7b930c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -22,6 +22,7 @@ import static org.apache.hyracks.api.exceptions.ErrorCode.TASK_ABORTED;
 
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.LinkedHashSet;
@@ -112,6 +113,8 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
 
     private final IStatsCollector statsCollector;
 
+    private volatile boolean completed = false;
+
     public Task(Joblet joblet, Set<JobFlag> jobFlags, TaskAttemptId taskId, String displayName,
             ExecutorService executor, NodeControllerService ncs,
             List<List<PartitionChannel>> inputChannelsFromConnectors) {
@@ -255,8 +258,11 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
         if (aborted) {
             return false;
         }
-        pendingThreads.add(t);
-        return true;
+        return pendingThreads.add(t);
+    }
+
+    public synchronized List<Thread> getPendingThreads() {
+        return new ArrayList<>(pendingThreads);
     }
 
     private synchronized void removePendingThread(Thread t) {
@@ -300,8 +306,6 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
                         executorService.execute(() -> {
                             try {
                                 Thread thread = Thread.currentThread();
-                                // Calls synchronized addPendingThread(..) to make sure that in the abort() method,
-                                // the thread is not escaped from interruption.
                                 if (!addPendingThread(thread)) {
                                     return;
                                 }
@@ -345,6 +349,7 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
         } finally {
             close();
             removePendingThread(ct);
+            completed = true;
         }
         if (!exceptions.isEmpty()) {
             if (LOGGER.isWarnEnabled()) {
@@ -460,4 +465,8 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
     public IStatsCollector getStatsCollector() {
         return statsCollector;
     }
+
+    public boolean isCompleted() {
+        return completed;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a77ec691/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
index 2bcf414..c6696fd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
@@ -18,7 +18,9 @@
  */
 package org.apache.hyracks.control.nc.work;
 
+import java.util.ArrayDeque;
 import java.util.Collection;
+import java.util.Deque;
 
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
@@ -50,13 +52,14 @@ public class AbortAllJobsWork extends SynchronizableWork {
         if (dpm == null) {
             LOGGER.log(Level.WARN, "DatasetPartitionManager is null on " + ncs.getId());
         }
+        Deque<Task> abortedTasks = new ArrayDeque<>();
         Collection<Joblet> joblets = ncs.getJobletMap().values();
         // TODO(mblow): should we have one jobletmap per cc?
         joblets.stream().filter(joblet -> joblet.getJobId().getCcId().equals(ccId)).forEach(joblet -> {
-            Collection<Task> tasks = joblet.getTaskMap().values();
-            for (Task task : tasks) {
+            joblet.getTaskMap().values().forEach(task -> {
                 task.abort();
-            }
+                abortedTasks.add(task);
+            });
             final JobId jobId = joblet.getJobId();
             if (dpm != null) {
                 dpm.abortReader(jobId);
@@ -64,5 +67,6 @@ public class AbortAllJobsWork extends SynchronizableWork {
             }
             ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, jobId, JobStatus.FAILURE));
         });
+        ncs.getExecutor().submit(new EnsureAllCcTasksCompleted(ncs, ccId, abortedTasks));
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a77ec691/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
new file mode 100644
index 0000000..156a5c9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.work;
+
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.Task;
+import org.apache.hyracks.util.ExitUtil;
+import org.apache.hyracks.util.Span;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+@SuppressWarnings({ "squid:S1181", "squid:S1166" })
+public class EnsureAllCcTasksCompleted implements Runnable {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long TIMEOUT = TimeUnit.MINUTES.toMillis(2);
+    private final NodeControllerService ncs;
+    private final CcId ccId;
+    private final Deque<Task> abortedTasks;
+    private final Span span;
+
+    public EnsureAllCcTasksCompleted(NodeControllerService ncs, CcId ccId, Deque<Task> abortedTasks) {
+        this.ncs = ncs;
+        this.ccId = ccId;
+        this.abortedTasks = abortedTasks;
+        span = Span.start(2, TimeUnit.MINUTES);
+    }
+
+    @Override
+    public void run() {
+        try {
+            LOGGER.log(Level.INFO, "Ensuring all tasks of {} have completed", ccId);
+            while (!span.elapsed()) {
+                removeAborted();
+                if (abortedTasks.isEmpty()) {
+                    break;
+                }
+                LOGGER.log(Level.INFO, "{} tasks are still running", abortedTasks.size());
+                Thread.sleep(TimeUnit.SECONDS.toMillis(1)); // Check once a second
+            }
+            if (abortedTasks.isEmpty()) {
+                LOGGER.log(Level.INFO, "All tasks of {} have completed, Completing registration", ccId);
+                // all tasks has completed
+                ncs.getApplication().onRegisterNode(ccId);
+            } else {
+                LOGGER.log(Level.ERROR,
+                        "Failed to abort all previous tasks associated with CC {} after {}ms. Giving up", ccId,
+                        TIMEOUT);
+                LOGGER.log(Level.ERROR, "{} tasks failed to complete within timeout", abortedTasks.size());
+                abortedTasks.forEach(task -> {
+                    List<Thread> pendingThreads = task.getPendingThreads();
+                    LOGGER.log(Level.ERROR, "task {} was stuck. Stuck thread count = {}", task.getTaskAttemptId(),
+                            pendingThreads.size());
+                    pendingThreads.forEach(thread -> {
+                        LOGGER.log(Level.ERROR, "Stuck thread trace: {}", Arrays.toString(thread.getStackTrace()));
+                    });
+                });
+                ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
+            }
+        } catch (Throwable th) {
+            try {
+                LOGGER.log(Level.ERROR, "Failed to abort all previous tasks associated with CC {}", ccId, th);
+            } catch (Throwable ignore) {
+                // Ignore logging errors
+            }
+            ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
+        }
+    }
+
+    private void removeAborted() {
+        int numTasks = abortedTasks.size();
+        for (int i = 0; i < numTasks; i++) {
+            Task task = abortedTasks.poll();
+            if (!task.isCompleted()) {
+                abortedTasks.add(task);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a77ec691/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index db5cd13..1a17012 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -33,6 +33,7 @@ public class ExitUtil {
     public static final int EC_ABNORMAL_TERMINATION = 1;
     public static final int EC_FAILED_TO_STARTUP = 2;
     public static final int EC_FAILED_TO_RECOVER = 3;
+    public static final int NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS = 4;
     public static final int EC_UNHANDLED_EXCEPTION = 11;
     public static final int EC_IMMEDIATE_HALT = 33;
     public static final int EC_HALT_ABNORMAL_RESERVED_44 = 44;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a77ec691/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
index 95db604..d8d6bb1 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
@@ -34,15 +34,19 @@ public class Span {
     }
 
     public boolean elapsed() {
-        return remaining(TimeUnit.NANOSECONDS) > spanNanos;
+        return elapsed(TimeUnit.NANOSECONDS) > spanNanos;
     }
 
-    public long remaining(TimeUnit unit) {
+    public long elapsed(TimeUnit unit) {
         return unit.convert(System.nanoTime() - startNanos, TimeUnit.NANOSECONDS);
     }
 
     public void sleep(long sleep, TimeUnit unit) throws InterruptedException {
-        TimeUnit.NANOSECONDS.sleep(Math.min(remaining(TimeUnit.NANOSECONDS), unit.toNanos(sleep)));
+        TimeUnit.NANOSECONDS.sleep(Math.min(elapsed(TimeUnit.NANOSECONDS), unit.toNanos(sleep)));
+    }
+
+    public long remaining(TimeUnit unit) {
+        return unit.convert(Long.max(spanNanos - elapsed(TimeUnit.NANOSECONDS), 0L), TimeUnit.NANOSECONDS);
     }
 
     public void loopUntilExhausted(ThrowingAction action) throws Exception {
@@ -52,7 +56,7 @@ public class Span {
     public void loopUntilExhausted(ThrowingAction action, long delay, TimeUnit delayUnit) throws Exception {
         while (!elapsed()) {
             action.run();
-            if (remaining(delayUnit) < delay) {
+            if (elapsed(delayUnit) < delay) {
                 break;
             }
             delayUnit.sleep(delay);


[3/4] asterixdb git commit: [NO ISSUE] IOCounter improvements

Posted by mb...@apache.org.
[NO ISSUE] IOCounter improvements

- cache results for short period
- split iostat, proc fs impls

Change-Id: I7789171db6b6d7eea3561c24467af63f065f5dc6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2515
Reviewed-by: Murtadha Hubail <mh...@apache.org>
Tested-by: Murtadha Hubail <mh...@apache.org>
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/94b6da6a
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/94b6da6a
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/94b6da6a

Branch: refs/heads/master
Commit: 94b6da6ae33749dfdee35e35f32c165693015e3d
Parents: 2587f1d
Author: Michael Blow <mb...@apache.org>
Authored: Thu Mar 22 20:15:37 2018 -0400
Committer: Michael Blow <mb...@apache.org>
Committed: Thu Mar 22 22:11:31 2018 -0700

----------------------------------------------------------------------
 .../control/nc/io/profiling/IIOCounter.java     |   8 +-
 .../control/nc/io/profiling/IOCounterCache.java |  41 ++++++
 .../nc/io/profiling/IOCounterDefault.java       |   6 +-
 .../nc/io/profiling/IOCounterFactory.java       |   9 +-
 .../nc/io/profiling/IOCounterIoStat.java        | 101 +++++++++++++++
 .../control/nc/io/profiling/IOCounterLinux.java | 128 -------------------
 .../control/nc/io/profiling/IOCounterOSX.java   |   6 +-
 .../control/nc/io/profiling/IOCounterProc.java  |  71 ++++++++++
 8 files changed, 231 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/94b6da6a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IIOCounter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IIOCounter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IIOCounter.java
index 3612d8f..a85ca2c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IIOCounter.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IIOCounter.java
@@ -22,12 +22,12 @@ package org.apache.hyracks.control.nc.io.profiling;
 public interface IIOCounter {
 
     /**
-     * @return the number of block reads from the very beginning
+     * @return the number of block reads from the very beginning, or -1 if not available on this platform
      */
-    public long getReads();
+    long getReads();
 
     /**
-     * @return the number of block writes from the very beginning
+     * @return the number of block writes from the very beginning, or -1 if not available on this platform
      */
-    public long getWrites();
+    long getWrites();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/94b6da6a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterCache.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterCache.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterCache.java
new file mode 100644
index 0000000..842d82b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterCache.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.nc.io.profiling;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hyracks.util.Span;
+
+abstract class IOCounterCache<T> implements IIOCounter {
+    private static final long TTL_NANOS = TimeUnit.MILLISECONDS.toNanos(500);
+    private Span span;
+    private T info;
+
+    protected synchronized T getInfo() throws IOException {
+        if (info == null || span.elapsed()) {
+            span = Span.start(TTL_NANOS, TimeUnit.NANOSECONDS);
+            info = calculateInfo();
+        }
+        return info;
+    }
+
+    protected abstract T calculateInfo() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/94b6da6a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterDefault.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterDefault.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterDefault.java
index 1f8669d..d5ec9a0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterDefault.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterDefault.java
@@ -21,14 +21,16 @@ package org.apache.hyracks.control.nc.io.profiling;
 
 public class IOCounterDefault implements IIOCounter {
 
+    public static final long IO_COUNTER_UNAVAILABLE = -1;
+
     @Override
     public long getReads() {
-        return 0;
+        return IO_COUNTER_UNAVAILABLE;
     }
 
     @Override
     public long getWrites() {
-        return 0;
+        return IO_COUNTER_UNAVAILABLE;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/94b6da6a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java
index 2301ae6..36b310d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java
@@ -33,9 +33,12 @@ public class IOCounterFactory {
      */
     public IIOCounter getIOCounter() {
         String osName = System.getProperty("os.name").toLowerCase();
-        if (osName.indexOf("nix") >= 0 || osName.indexOf("nux") >= 0 || osName.indexOf("aix") >= 0) {
-            return new IOCounterLinux();
-        } else if (osName.indexOf("mac") >= 0) {
+        if (osName.contains("nix") || osName.contains("nux") || osName.contains("aix")) {
+            if (IOCounterProc.STATFILE.exists()) {
+                return new IOCounterProc();
+            }
+            return new IOCounterIoStat();
+        } else if (osName.contains("mac")) {
             return new IOCounterOSX();
         } else {
             return new IOCounterDefault();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/94b6da6a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterIoStat.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterIoStat.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterIoStat.java
new file mode 100644
index 0000000..560035b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterIoStat.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.nc.io.profiling;
+
+import static org.apache.hyracks.control.nc.io.profiling.IOCounterDefault.IO_COUNTER_UNAVAILABLE;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class IOCounterIoStat extends IOCounterCache<List<String>> {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final String COMMAND = "iostat";
+    private static final int PAGE_SIZE = 512;
+    private long failureCount;
+
+    private long baseReads;
+    private long baseWrites;
+
+    IOCounterIoStat() {
+        baseReads = getReads();
+        baseWrites = getWrites();
+    }
+
+    @Override
+    public long getReads() {
+        try {
+            long reads = extractColumn(4) * PAGE_SIZE;
+            return reads == 0 ? IO_COUNTER_UNAVAILABLE : reads - baseReads;
+        } catch (Exception e) {
+            LOGGER.log(failureCount++ > 0 ? Level.DEBUG : Level.WARN, "Failure getting reads", e);
+            return IO_COUNTER_UNAVAILABLE;
+        }
+    }
+
+    @Override
+    public long getWrites() {
+        try {
+            long writes = extractColumn(5) * PAGE_SIZE;
+            return writes == 0 ? IO_COUNTER_UNAVAILABLE : writes - baseWrites;
+        } catch (Exception e) {
+            LOGGER.log(failureCount++ > 0 ? Level.DEBUG : Level.WARN, "Failure getting writes", e);
+            return IO_COUNTER_UNAVAILABLE;
+        }
+    }
+
+    private long extractColumn(int columnIndex) throws IOException {
+        boolean device = false;
+        long ios = 0;
+        for (String line : getInfo()) {
+            if (line.contains("Blk_read")) {
+                device = true;
+                continue;
+            }
+            if (device) {
+                StringTokenizer tokenizer = new StringTokenizer(line);
+                int i = 0;
+                while (tokenizer.hasMoreTokens()) {
+                    String column = tokenizer.nextToken();
+                    if (i == columnIndex) {
+                        ios += Long.parseLong(column);
+                        break;
+                    }
+                    i++;
+                }
+            }
+        }
+        return ios;
+    }
+
+    @Override
+    protected List<String> calculateInfo() throws IOException {
+        try (InputStream inputStream = Runtime.getRuntime().exec(COMMAND).getInputStream()) {
+            return IOUtils.readLines(inputStream, Charset.defaultCharset());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/94b6da6a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterLinux.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterLinux.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterLinux.java
deleted file mode 100644
index 3db4c8c..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterLinux.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.control.nc.io.profiling;
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.StringTokenizer;
-
-public class IOCounterLinux implements IIOCounter {
-    public static final String COMMAND = "iostat";
-    public static final String STATFILE = "/proc/self/io";
-    public static final int PAGE_SIZE = 4096;
-
-    private long baseReads = 0;
-    private long baseWrites = 0;
-
-    public IOCounterLinux() {
-        baseReads = getReads();
-        baseWrites = getWrites();
-    }
-
-    @Override
-    public long getReads() {
-        try {
-            long reads = extractRow(4);
-            return reads;
-        } catch (IOException e) {
-            try {
-                long reads = extractColumn(4) * PAGE_SIZE;
-                return reads - baseReads;
-            } catch (IOException e2) {
-                return 0;
-            }
-        }
-    }
-
-    @Override
-    public long getWrites() {
-        try {
-            long writes = extractRow(5);
-            long cancelledWrites = extractRow(6);
-            return (writes - cancelledWrites);
-        } catch (IOException e) {
-            try {
-                long writes = extractColumn(5) * PAGE_SIZE;
-                return writes - baseWrites;
-            } catch (IOException e2) {
-                return 0;
-            }
-        }
-    }
-
-    private long extractColumn(int columnIndex) throws IOException {
-        BufferedReader reader = exec(COMMAND);
-        String line = null;
-        boolean device = false;
-        long ios = 0;
-        while ((line = reader.readLine()) != null) {
-            if (line.contains("Blk_read")) {
-                device = true;
-                continue;
-            }
-            if (device == true) {
-                StringTokenizer tokenizer = new StringTokenizer(line);
-                int i = 0;
-                while (tokenizer.hasMoreTokens()) {
-                    String column = tokenizer.nextToken();
-                    if (i == columnIndex) {
-                        ios += Long.parseLong(column);
-                        break;
-                    }
-                    i++;
-                }
-            }
-        }
-        reader.close();
-        return ios;
-    }
-
-    private long extractRow(int rowIndex) throws IOException {
-        BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(STATFILE)));
-        String line = null;
-        long ios = 0;
-        int i = 0;
-        while ((line = reader.readLine()) != null) {
-            if (i == rowIndex) {
-                StringTokenizer tokenizer = new StringTokenizer(line);
-                int j = 0;
-                while (tokenizer.hasMoreTokens()) {
-                    String column = tokenizer.nextToken();
-                    if (j == 1) {
-                        ios = Long.parseLong(column);
-                        break;
-                    }
-                    j++;
-                }
-            }
-            i++;
-        }
-        reader.close();
-        return ios;
-    }
-
-    private BufferedReader exec(String command) throws IOException {
-        Process p = Runtime.getRuntime().exec(command);
-        return new BufferedReader(new InputStreamReader(p.getInputStream()));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/94b6da6a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterOSX.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterOSX.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterOSX.java
index 729157b..a48d55d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterOSX.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterOSX.java
@@ -19,16 +19,18 @@
 
 package org.apache.hyracks.control.nc.io.profiling;
 
+import static org.apache.hyracks.control.nc.io.profiling.IOCounterDefault.IO_COUNTER_UNAVAILABLE;
+
 public class IOCounterOSX implements IIOCounter {
 
     @Override
     public long getReads() {
-        return 0;
+        return IO_COUNTER_UNAVAILABLE;
     }
 
     @Override
     public long getWrites() {
-        return 0;
+        return IO_COUNTER_UNAVAILABLE;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/94b6da6a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterProc.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterProc.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterProc.java
new file mode 100644
index 0000000..8882271
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterProc.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.nc.io.profiling;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class IOCounterProc extends IOCounterCache<List<String>> {
+    private static final Logger LOGGER = LogManager.getLogger();
+    @SuppressWarnings("squid:S1075") // hardcoded URI
+    public static final File STATFILE = new File("/proc/self/io");
+    private long failureCount;
+
+    @Override
+    public long getReads() {
+        try {
+            return extractRow(getInfo(), 4);
+        } catch (Exception e) {
+            LOGGER.log(failureCount++ > 0 ? Level.DEBUG : Level.WARN, "Failure getting reads", e);
+            return IOCounterDefault.IO_COUNTER_UNAVAILABLE;
+        }
+    }
+
+    @Override
+    public long getWrites() {
+        try {
+            List<String> rows = getInfo();
+            long writes = extractRow(rows, 5);
+            long cancelledWrites = extractRow(rows, 6);
+            return writes - cancelledWrites;
+        } catch (Exception e) {
+            LOGGER.log(failureCount++ > 0 ? Level.DEBUG : Level.WARN, "Failure getting writes", e);
+            return IOCounterDefault.IO_COUNTER_UNAVAILABLE;
+        }
+    }
+
+    private long extractRow(List<String> rows, int rowIndex) {
+        return Long.parseLong(StringUtils.split(rows.get(rowIndex), ' ')[1]);
+    }
+
+    @Override
+    protected List<String> calculateInfo() throws IOException {
+        return FileUtils.readLines(STATFILE, Charset.defaultCharset());
+    }
+
+}


[4/4] asterixdb git commit: Merge commit '94b6da6ae33749dfdee35e35f32c165693015e3d' from release-0.9.4-pre-rc

Posted by mb...@apache.org.
Merge commit '94b6da6ae33749dfdee35e35f32c165693015e3d' from release-0.9.4-pre-rc

Change-Id: I73edc93888a80687416cc80014ffebc2683c68f6


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/ce675542
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/ce675542
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/ce675542

Branch: refs/heads/master
Commit: ce67554220164a9aab55bc1498d3b0559c498217
Parents: 2ebd0d4 94b6da6
Author: Michael Blow <mb...@apache.org>
Authored: Fri Mar 23 01:21:04 2018 -0400
Committer: Michael Blow <mb...@apache.org>
Committed: Fri Mar 23 01:21:04 2018 -0400

----------------------------------------------------------------------
 .../hyracks/bootstrap/NCApplication.java        |   1 +
 .../asterix/test/common/TestExecutor.java       |  14 +-
 .../hyracks/control/cc/cluster/NodeManager.java |  15 +--
 .../control/nc/NodeControllerService.java       |  19 +--
 .../org/apache/hyracks/control/nc/Task.java     |  17 ++-
 .../control/nc/io/profiling/IIOCounter.java     |   8 +-
 .../control/nc/io/profiling/IOCounterCache.java |  41 ++++++
 .../nc/io/profiling/IOCounterDefault.java       |   6 +-
 .../nc/io/profiling/IOCounterFactory.java       |   9 +-
 .../nc/io/profiling/IOCounterIoStat.java        | 101 +++++++++++++++
 .../control/nc/io/profiling/IOCounterLinux.java | 128 -------------------
 .../control/nc/io/profiling/IOCounterOSX.java   |   6 +-
 .../control/nc/io/profiling/IOCounterProc.java  |  71 ++++++++++
 .../control/nc/work/AbortAllJobsWork.java       |  10 +-
 .../nc/work/EnsureAllCcTasksCompleted.java      | 102 +++++++++++++++
 .../java/org/apache/hyracks/util/ExitUtil.java  |   1 +
 .../main/java/org/apache/hyracks/util/Span.java |  12 +-
 17 files changed, 391 insertions(+), 170 deletions(-)
----------------------------------------------------------------------



[2/4] asterixdb git commit: [NO ISSUE][TEST] Add timeout multiplier for test executor timeouts

Posted by mb...@apache.org.
[NO ISSUE][TEST] Add timeout multiplier for test executor timeouts

Change-Id: I340098ce840f31f2afcef5dd66e43a21ed7ceb9d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2513
Reviewed-by: Murtadha Hubail <mh...@apache.org>
Tested-by: Michael Blow <mb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/2587f1db
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/2587f1db
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/2587f1db

Branch: refs/heads/master
Commit: 2587f1dbe58f7fa65163d141eed1c40aee3fdb1a
Parents: a77ec69
Author: Michael Blow <mb...@apache.org>
Authored: Thu Mar 22 17:09:16 2018 -0400
Committer: Michael Blow <mb...@apache.org>
Committed: Thu Mar 22 17:22:58 2018 -0700

----------------------------------------------------------------------
 .../org/apache/asterix/test/common/TestExecutor.java  | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2587f1db/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 8c0a3d4..80048bd 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -154,6 +154,7 @@ public class TestExecutor {
     protected int endpointSelector;
     protected IExternalUDFLibrarian librarian;
     private Map<File, TestLoop> testLoops = new HashMap<>();
+    private double timeoutMultiplier = 1;
 
     public TestExecutor() {
         this(Inet4Address.getLoopbackAddress().getHostAddress(), 19002);
@@ -183,6 +184,10 @@ public class TestExecutor {
         this.replicationAddress = replicationAddress;
     }
 
+    public void setTimeoutMultiplier(double timeoutMultiplier) {
+        this.timeoutMultiplier = timeoutMultiplier;
+    }
+
     /**
      * Probably does not work well with symlinks.
      */
@@ -1449,10 +1454,10 @@ public class TestExecutor {
         return false;
     }
 
-    public static int getTimeoutSecs(String statement) {
+    public int getTimeoutSecs(String statement) {
         final Matcher timeoutMatcher = POLL_TIMEOUT_PATTERN.matcher(statement);
         if (timeoutMatcher.find()) {
-            return Integer.parseInt(timeoutMatcher.group(1));
+            return (int) (Integer.parseInt(timeoutMatcher.group(1)) * timeoutMultiplier);
         } else {
             throw new IllegalArgumentException("ERROR: polltimeoutsecs=nnn must be present in poll file");
         }
@@ -1796,7 +1801,8 @@ public class TestExecutor {
         waitForClusterState("ACTIVE", timeoutSecs, timeUnit);
     }
 
-    public void waitForClusterState(String desiredState, int timeout, TimeUnit timeUnit) throws Exception {
+    public void waitForClusterState(String desiredState, int baseTimeout, TimeUnit timeUnit) throws Exception {
+        int timeout = (int) (baseTimeout * timeoutMultiplier);
         LOGGER.info("Waiting for cluster state " + desiredState + "...");
         Thread t = new Thread(() -> {
             while (true) {
@@ -1874,7 +1880,7 @@ public class TestExecutor {
         }
         String host = command[0];
         int port = Integer.parseInt(command[1]);
-        int timeoutSec = Integer.parseInt(command[2]);
+        int timeoutSec = (int) (Integer.parseInt(command[2]) * timeoutMultiplier);
         while (isPortActive(host, port)) {
             TimeUnit.SECONDS.sleep(1);
             timeoutSec--;