You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2018/01/24 02:08:31 UTC

asterixdb git commit: [ASTERIXDB-1706][RT] Use System.nanoTime For Result Timestamp

Repository: asterixdb
Updated Branches:
  refs/heads/master 2d059fd1c -> 9d0e21cea


[ASTERIXDB-1706][RT] Use System.nanoTime For Result Timestamp

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Use System.nanoTime for result timestamp to
  avoid results being incorrectly swept due to
  System.currentTimeMillis system time adjustments.
- Move sweep logic to AbstractDatasetManager.

Change-Id: I388d2a477bcfdc47d11dc6a4873483b82c9fadbf
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2315
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/9d0e21ce
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/9d0e21ce
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/9d0e21ce

Branch: refs/heads/master
Commit: 9d0e21cea4d431933bbd9ba3819b9802a1e4c0fc
Parents: 2d059fd
Author: Murtadha Hubail <mh...@apache.org>
Authored: Wed Jan 24 02:52:33 2018 +0300
Committer: Murtadha Hubail <mh...@apache.org>
Committed: Tue Jan 23 18:07:58 2018 -0800

----------------------------------------------------------------------
 .../hyracks/api/dataset/DatasetJobRecord.java   |  2 +-
 .../hyracks/api/dataset/IDatasetManager.java    | 14 +++--
 .../cc/dataset/DatasetDirectoryService.java     | 18 +++----
 .../common/dataset/AbstractDatasetManager.java  | 55 ++++++++++++++++++++
 .../common/dataset/ResultStateSweeper.java      | 44 +++-------------
 .../nc/dataset/DatasetPartitionManager.java     | 18 ++-----
 .../control/nc/dataset/ResultSetMap.java        |  2 +-
 7 files changed, 83 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9d0e21ce/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
index 55f1d7c..4e7ddda 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
@@ -84,7 +84,7 @@ public class DatasetJobRecord implements IDatasetStateRecord {
     private Map<ResultSetId, ResultSetMetaData> resultSetMetadataMap = new HashMap<>();
 
     public DatasetJobRecord() {
-        this.timestamp = System.currentTimeMillis();
+        this.timestamp = System.nanoTime();
         this.status = new Status();
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9d0e21ce/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
index c8463d3..a0c1f78 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
@@ -24,11 +24,15 @@ import org.apache.hyracks.api.job.JobId;
 
 public interface IDatasetManager {
 
-    public Set<JobId> getJobIds();
+    Set<JobId> getJobIds();
 
-    public IDatasetStateRecord getState(JobId jobId);
+    IDatasetStateRecord getState(JobId jobId);
 
-    public void deinitState(JobId jobId);
+    void sweep(JobId jobId);
 
-    public long getResultTimestamp(JobId jobId);
-}
+    /**
+     * Removes all references and deletes persisted files for
+     * all expired datasets.
+     */
+    void sweepExpiredDatasets();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9d0e21ce/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index a57baf5..04aaddd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -40,6 +40,7 @@ import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.control.common.dataset.AbstractDatasetManager;
 import org.apache.hyracks.control.common.dataset.ResultStateSweeper;
 import org.apache.hyracks.control.common.work.IResultCallback;
 import org.apache.logging.log4j.Level;
@@ -53,25 +54,23 @@ import org.apache.logging.log4j.Logger;
  * the job (after it receives all the results) completely. Then we can just get rid of the location information for that
  * job.
  */
-public class DatasetDirectoryService implements IDatasetDirectoryService {
+public class DatasetDirectoryService extends AbstractDatasetManager implements IDatasetDirectoryService {
 
     private static final Logger LOGGER = LogManager.getLogger();
 
-    private final long resultTTL;
-
     private final long resultSweepThreshold;
 
     private final Map<JobId, JobResultInfo> jobResultLocations;
 
     public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
-        this.resultTTL = resultTTL;
+        super(resultTTL);
         this.resultSweepThreshold = resultSweepThreshold;
-        jobResultLocations = new LinkedHashMap<JobId, JobResultInfo>();
+        jobResultLocations = new LinkedHashMap<>();
     }
 
     @Override
     public void init(ExecutorService executor) {
-        executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold, LOGGER));
+        executor.execute(new ResultStateSweeper(this, resultSweepThreshold, LOGGER));
     }
 
     @Override
@@ -181,12 +180,7 @@ public class DatasetDirectoryService implements IDatasetDirectoryService {
     }
 
     @Override
-    public synchronized long getResultTimestamp(JobId jobId) {
-        return getState(jobId).getTimestamp();
-    }
-
-    @Override
-    public synchronized void deinitState(JobId jobId) {
+    public synchronized void sweep(JobId jobId) {
         jobResultLocations.remove(jobId);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9d0e21ce/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/AbstractDatasetManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/AbstractDatasetManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/AbstractDatasetManager.java
new file mode 100644
index 0000000..f95229e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/AbstractDatasetManager.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.dataset;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hyracks.api.dataset.IDatasetManager;
+import org.apache.hyracks.api.dataset.IDatasetStateRecord;
+import org.apache.hyracks.api.job.JobId;
+
+public abstract class AbstractDatasetManager implements IDatasetManager {
+
+    private final long nanoResultTTL;
+
+    protected AbstractDatasetManager(long resultTTL) {
+        this.nanoResultTTL = TimeUnit.MILLISECONDS.toNanos(resultTTL);
+    }
+
+    @Override
+    public synchronized void sweepExpiredDatasets() {
+        final List<JobId> expiredDatasets = new ArrayList<>();
+        final long sweepTime = System.nanoTime();
+        for (JobId jobId : getJobIds()) {
+            final IDatasetStateRecord state = getState(jobId);
+            if (state != null && hasExpired(state, sweepTime, nanoResultTTL)) {
+                expiredDatasets.add(jobId);
+            }
+        }
+        for (JobId jobId : expiredDatasets) {
+            sweep(jobId);
+        }
+    }
+
+    private static boolean hasExpired(IDatasetStateRecord dataset, long currentTime, long ttl) {
+        return currentTime - dataset.getTimestamp() - ttl > 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9d0e21ce/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
index a9ca771..901ec67 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -19,12 +19,7 @@
 
 package org.apache.hyracks.control.common.dataset;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.hyracks.api.dataset.IDatasetManager;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.Logger;
 
 /**
@@ -33,53 +28,26 @@ import org.apache.logging.log4j.Logger;
 public class ResultStateSweeper implements Runnable {
 
     private final IDatasetManager datasetManager;
-
-    private final long resultTTL;
-
     private final long resultSweepThreshold;
-
     private final Logger logger;
 
-    private final List<JobId> toBeCollected;
-
-    public ResultStateSweeper(IDatasetManager datasetManager, long resultTTL, long resultSweepThreshold,
-            Logger logger) {
+    public ResultStateSweeper(IDatasetManager datasetManager, long resultSweepThreshold, Logger logger) {
         this.datasetManager = datasetManager;
-        this.resultTTL = resultTTL;
         this.resultSweepThreshold = resultSweepThreshold;
         this.logger = logger;
-        toBeCollected = new ArrayList<JobId>();
     }
 
     @Override
-    @SuppressWarnings("squid:S2142") // catch interrupted exception
     public void run() {
-        while (true) {
+        while (!Thread.currentThread().isInterrupted()) {
             try {
                 Thread.sleep(resultSweepThreshold);
-                sweep();
+                datasetManager.sweepExpiredDatasets();
+                logger.trace("Result state cleanup instance successfully completed.");
             } catch (InterruptedException e) {
-                logger.log(Level.WARN, "Result cleaner thread interrupted, shutting down.");
-                break; // the interrupt was explicit from another thread. This thread should shut down...
+                logger.warn("Result cleaner thread interrupted, shutting down.");
+                Thread.currentThread().interrupt();
             }
         }
     }
-
-    private void sweep() {
-        synchronized (datasetManager) {
-            toBeCollected.clear();
-            for (JobId jobId : datasetManager.getJobIds()) {
-                final long timestamp = datasetManager.getResultTimestamp(jobId);
-                if (timestamp != -1 && System.currentTimeMillis() > timestamp + resultTTL) {
-                    toBeCollected.add(jobId);
-                }
-            }
-            for (JobId jobId : toBeCollected) {
-                datasetManager.deinitState(jobId);
-            }
-        }
-        if (logger.isTraceEnabled()) {
-            logger.trace("Result state cleanup instance successfully completed.");
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9d0e21ce/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index d381a67..476aeae 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -26,11 +26,11 @@ import java.util.concurrent.Executor;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
-import org.apache.hyracks.api.dataset.IDatasetStateRecord;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.common.dataset.AbstractDatasetManager;
 import org.apache.hyracks.control.common.dataset.ResultStateSweeper;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
@@ -38,7 +38,7 @@ import org.apache.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-public class DatasetPartitionManager implements IDatasetPartitionManager {
+public class DatasetPartitionManager extends AbstractDatasetManager implements IDatasetPartitionManager {
     private static final Logger LOGGER = LogManager.getLogger();
 
     private final NodeControllerService ncs;
@@ -55,6 +55,7 @@ public class DatasetPartitionManager implements IDatasetPartitionManager {
 
     public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory, long resultTTL,
             long resultSweepThreshold) {
+        super(resultTTL);
         this.ncs = ncs;
         this.executor = executor;
         deallocatableRegistry = new DefaultDeallocatableRegistry();
@@ -65,7 +66,7 @@ public class DatasetPartitionManager implements IDatasetPartitionManager {
             datasetMemoryManager = null;
         }
         partitionResultStateMap = new LinkedHashMap<>();
-        executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold, LOGGER));
+        executor.execute(new ResultStateSweeper(this, resultSweepThreshold, LOGGER));
     }
 
     @Override
@@ -179,16 +180,7 @@ public class DatasetPartitionManager implements IDatasetPartitionManager {
     }
 
     @Override
-    public synchronized long getResultTimestamp(JobId jobId) {
-        IDatasetStateRecord r = getState(jobId);
-        if (r == null) {
-            return -1;
-        }
-        return r.getTimestamp();
-    }
-
-    @Override
-    public synchronized void deinitState(JobId jobId) {
+    public synchronized void sweep(JobId jobId) {
         deinit(jobId);
         partitionResultStateMap.remove(jobId);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9d0e21ce/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
index 3957401..1a64a5a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
@@ -37,7 +37,7 @@ class ResultSetMap implements IDatasetStateRecord, Serializable {
     private final HashMap<ResultSetId, ResultState[]> resultStateMap;
 
     ResultSetMap() {
-        timestamp = System.currentTimeMillis();
+        timestamp = System.nanoTime();
         resultStateMap = new HashMap<>();
     }