You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dp...@apache.org on 2018/10/06 13:31:49 UTC

[ignite-teamcity-bot] branch ignite-9800 updated: IGNITE-9800: Scheduler implemented

This is an automated email from the ASF dual-hosted git repository.

dpavlov pushed a commit to branch ignite-9800
in repository https://gitbox.apache.org/repos/asf/ignite-teamcity-bot.git


The following commit(s) were added to refs/heads/ignite-9800 by this push:
     new b09e4fa  IGNITE-9800: Scheduler implemented
b09e4fa is described below

commit b09e4fae0d792178ee30fe9084adeaded862f84b
Author: Dmitriy Pavlov <dp...@apache.org>
AuthorDate: Sat Oct 6 16:31:47 2018 +0300

    IGNITE-9800: Scheduler implemented
---
 .../org/apache/ignite/ci/di/MonitoredTask.java     |   3 +
 .../apache/ignite/ci/di/scheduler/IScheduler.java  |   2 +
 .../apache/ignite/ci/di/scheduler/NamedTask.java   | 155 +++++++++++++++++++++
 .../ignite/ci/di/scheduler/SchedulerModule.java    |  17 +++
 .../ignite/ci/di/scheduler/TcBotScheduler.java     |  35 ++++-
 .../org/apache/ignite/ci/github/GitHubBranch.java  |  33 +++++
 .../org/apache/ignite/ci/github/GitHubUser.java    |   2 +-
 .../ci/github/ignited/GitHubConnIgnitedImpl.java   |  16 ++-
 8 files changed, 256 insertions(+), 7 deletions(-)

diff --git a/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/di/MonitoredTask.java b/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/di/MonitoredTask.java
index 8b235e2..a5a1575 100644
--- a/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/di/MonitoredTask.java
+++ b/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/di/MonitoredTask.java
@@ -23,6 +23,9 @@ import java.lang.annotation.Target;
 
 @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD)
 public @interface MonitoredTask {
+    /**
+     * @return Display name for monitoring page.
+     */
     String name() default "";
 
     /**
diff --git a/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/di/scheduler/IScheduler.java b/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/di/scheduler/IScheduler.java
index aa7f237..79836d8 100644
--- a/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/di/scheduler/IScheduler.java
+++ b/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/di/scheduler/IScheduler.java
@@ -34,5 +34,7 @@ public interface IScheduler {
      */
     public void invokeLater(Runnable cmd, long delay, TimeUnit unit);
 
+    public void sheduleNamed(String fullName, Runnable cmd, long queitPeriod, TimeUnit unit);
+
     public void stop();
 }
diff --git a/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/di/scheduler/NamedTask.java b/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/di/scheduler/NamedTask.java
new file mode 100644
index 0000000..ef634ee
--- /dev/null
+++ b/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/di/scheduler/NamedTask.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.ci.di.scheduler;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.StampedLock;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+
+class NamedTask {
+    private final StampedLock lock = new StampedLock();
+    private final String name;
+
+    @GuardedBy("lock")
+    private volatile Runnable cmd;
+
+    @GuardedBy("lock")
+    private volatile long lastFinishedTs = 0;
+
+    @GuardedBy("lock")
+    private volatile Status status = Status.CREATED;
+
+    @GuardedBy("lock")
+    private volatile long resValidityMs = 0;
+
+    enum Status {
+        CREATED, RUNNING, COMPLETED ;
+    }
+
+    public NamedTask(String name) {
+        this.name = name;
+    }
+
+    public void sheduleWithQuitePeriod(@Nonnull Runnable cmd, long period, TimeUnit unit) {
+        long resValidityMs = unit.toMillis(period);
+
+        boolean canSkip = false;
+
+        long optReadStamp = lock.tryOptimisticRead();
+
+        if (status == Status.RUNNING)
+            canSkip = true;
+
+        if (this.resValidityMs == 0 || this.resValidityMs > resValidityMs)
+            canSkip = false;
+
+        boolean optRead = lock.validate(optReadStamp);
+
+        if (optRead && canSkip)
+            return;
+
+        long writeLockStamp = lock.writeLock();
+        try {
+            this.cmd = cmd;
+            if (this.resValidityMs != 0)
+                this.resValidityMs = Math.min(this.resValidityMs, resValidityMs);
+            else
+                this.resValidityMs = resValidityMs;
+        }
+        finally {
+            lock.unlock(writeLockStamp);
+        }
+
+    }
+
+    public Runnable needRun() {
+        long optReadStamp = lock.tryOptimisticRead();
+        boolean canSkip = canSkipStartNow();
+        boolean optRead = lock.validate(optReadStamp);
+
+        if (optRead) {
+            if (canSkip)
+                return null;
+        }
+        else {
+            long readStamp = lock.readLock();
+            boolean canSkipStartNow;
+
+            try {
+                canSkipStartNow = canSkipStartNow();
+            }
+            finally {
+                lock.unlockRead(readStamp);
+            }
+
+            if (canSkipStartNow)
+                return null;
+        }
+
+        Runnable cmd;
+        long writeLockStamp = lock.writeLock();
+        try {
+            cmd = this.cmd;
+            this.cmd = null;
+
+            // because here lock is not upgraded from read lock cmd may come here with null
+            if (cmd != null)
+                status = Status.RUNNING;
+        }
+        finally {
+            lock.unlock(writeLockStamp);
+        }
+
+        if (cmd == null)
+            return null;
+
+        try {
+            cmd.run();
+        }
+        finally {
+            long writeLockStamp2 = lock.writeLock();
+            try {
+                lastFinishedTs = System.currentTimeMillis();
+                status = Status.COMPLETED;
+            }
+            finally {
+                lock.unlock(writeLockStamp2);
+            }
+        }
+
+        return cmd;
+    }
+
+    public boolean canSkipStartNow() {
+        boolean canSkip = false;
+        if (status == Status.RUNNING)
+            canSkip = true;
+
+        if (status == Status.COMPLETED) {
+            if (cmd == null)
+                canSkip = true; // No one asked to run
+
+            if (lastFinishedTs != 0 && (System.currentTimeMillis() - lastFinishedTs) < resValidityMs) {
+                //result is still fresh
+                canSkip = true;
+            }
+        }
+        return canSkip;
+    }
+
+}
diff --git a/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/di/scheduler/SchedulerModule.java b/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/di/scheduler/SchedulerModule.java
index 69a310b..6379fbd 100644
--- a/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/di/scheduler/SchedulerModule.java
+++ b/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/di/scheduler/SchedulerModule.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.ci.di.scheduler;
 
 import com.google.inject.AbstractModule;
diff --git a/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/di/scheduler/TcBotScheduler.java b/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/di/scheduler/TcBotScheduler.java
index 941d3ab..f3d2c8e 100644
--- a/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/di/scheduler/TcBotScheduler.java
+++ b/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/di/scheduler/TcBotScheduler.java
@@ -17,11 +17,14 @@
 package org.apache.ignite.ci.di.scheduler;
 
 import com.google.common.base.Preconditions;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.ci.di.MonitoredTask;
 
 class TcBotScheduler implements IScheduler {
     /** Initial guard. */
@@ -30,13 +33,41 @@ class TcBotScheduler implements IScheduler {
     /** Executor service. */
     private volatile ScheduledExecutorService executorSvc;
 
+    /** Submit named task checker guard. */
+    private AtomicBoolean tickGuard = new AtomicBoolean();
+
+    private final ConcurrentMap<String, NamedTask> namedTasks = new ConcurrentHashMap<>();
+
     @Override public void invokeLater(Runnable cmd, long delay, TimeUnit unit) {
         service().schedule(cmd, delay, unit);
     }
 
+    @Override public void sheduleNamed(String fullName, Runnable cmd, long queitPeriod, TimeUnit unit) {
+        NamedTask task = namedTasks.computeIfAbsent(fullName, NamedTask::new);
+
+        task.sheduleWithQuitePeriod(cmd, queitPeriod, unit);
+
+        if (tickGuard.compareAndSet(false, true))
+            service().scheduleAtFixedRate(this::checkNamedTasks, 0, 1, TimeUnit.SECONDS);
+    }
+
+    /**
+     *
+     */
+    @MonitoredTask(name = "Run Named Scheduled Tasks")
+    protected String checkNamedTasks() {
+        AtomicInteger started = new AtomicInteger();
+        namedTasks.forEach((s, task) -> {
+            Runnable runnable = task.needRun();
+            if (runnable != null)
+                started.incrementAndGet();
+        });
+        return "Started " + started.get();
+    }
+
     /** {@inheritDoc} */
     @Override public void stop() {
-        if(executorSvc!=null)
+        if (executorSvc != null)
             executorSvc.shutdown();
     }
 
diff --git a/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/github/GitHubBranch.java b/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/github/GitHubBranch.java
index e1ff4d6..a4b3dea 100644
--- a/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/github/GitHubBranch.java
+++ b/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/github/GitHubBranch.java
@@ -1,7 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.ignite.ci.github;
 
+import com.google.common.base.Objects;
+
 public class GitHubBranch {
     private String label;
     private String ref;
     private String sha;
+
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        GitHubBranch branch = (GitHubBranch)o;
+        return Objects.equal(label, branch.label) &&
+            Objects.equal(ref, branch.ref) &&
+            Objects.equal(sha, branch.sha);
+    }
+
+    @Override public int hashCode() {
+        return Objects.hashCode(label, ref, sha);
+    }
 }
diff --git a/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/github/GitHubUser.java b/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/github/GitHubUser.java
index 34d0dee..3157acc 100644
--- a/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/github/GitHubUser.java
+++ b/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/github/GitHubUser.java
@@ -20,7 +20,7 @@ import com.google.common.base.Objects;
 import com.google.gson.annotations.SerializedName;
 
 public class GitHubUser {
-    private String login;
+    @SerializedName("login") private String login;
     @SerializedName("avatar_url") private String avatarUrl;
     /*See full example in prsList.json */
 
diff --git a/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/github/ignited/GitHubConnIgnitedImpl.java b/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/github/ignited/GitHubConnIgnitedImpl.java
index 9660b38..ff95b68 100644
--- a/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/github/ignited/GitHubConnIgnitedImpl.java
+++ b/ignite-tc-helper-web/src/main/java/org/apache/ignite/ci/github/ignited/GitHubConnIgnitedImpl.java
@@ -80,7 +80,8 @@ class GitHubConnIgnitedImpl implements IGitHubConnIgnited {
     /** {@inheritDoc} */
     @AutoProfiling
     @Override public List<PullRequest> getPullRequests() {
-        scheduler.invokeLater(this::actualizePrs, 10, TimeUnit.SECONDS);
+        scheduler.sheduleNamed(IGitHubConnIgnited.class.getSimpleName() + ".actualizePrs",
+            this::actualizePrs, 2, TimeUnit.MINUTES);
 
         return StreamSupport.stream(prCache.spliterator(), false)
             .filter(entry -> entry.getKey() >> 32 == srvIdMaskHigh)
@@ -108,6 +109,12 @@ class GitHubConnIgnitedImpl implements IGitHubConnIgnited {
     protected String runAtualizePrs(String srvId) {
         List<PullRequest> ghData = conn.getPullRequests();
 
+        int size = saveChunk(ghData);
+
+        return "Entries saved " + size;
+    }
+
+    private int saveChunk(List<PullRequest> ghData) {
         Set<Long> ids = ghData.stream().map(PullRequest::getNumber)
             .map(this::prNumberToCacheKey)
             .collect(Collectors.toSet());
@@ -123,9 +130,10 @@ class GitHubConnIgnitedImpl implements IGitHubConnIgnited {
                 entriesToPut.put(cacheKey, next);
         }
 
-        prCache.putAll(entriesToPut);
-
-        return "Entries saved " + entriesToPut.size();
+        int size = entriesToPut.size();
+        if (size != 0)
+            prCache.putAll(entriesToPut);
+        return size;
     }
 
     private long prNumberToCacheKey(int prNumber) {