You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by to...@apache.org on 2019/09/10 07:27:16 UTC

svn commit: r1866727 - in /jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob: SameThreadExecutorService.java UploadStagingCache.java

Author: tomekr
Date: Tue Sep 10 07:27:16 2019
New Revision: 1866727

URL: http://svn.apache.org/viewvc?rev=1866727&view=rev
Log:
OAK-8612: Make the Azure Data Store compatible with Guava 15 and 26

Added:
    jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SameThreadExecutorService.java
Modified:
    jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java

Added: jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SameThreadExecutorService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SameThreadExecutorService.java?rev=1866727&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SameThreadExecutorService.java (added)
+++ jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SameThreadExecutorService.java Tue Sep 10 07:27:16 2019
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import com.google.common.util.concurrent.AbstractListeningExecutorService;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Class copied from the Guava 15, to make the AzureDataStore compatible with
+ * the Guava 26 (where the SameThreadExecutorService is not present).
+ *
+ * TODO: Remove this class once the whole Oak is migrated to use Guava 26.
+ */
+class SameThreadExecutorService extends AbstractListeningExecutorService {
+    /**
+     * Lock used whenever accessing the state variables
+     * (runningTasks, shutdown, terminationCondition) of the executor
+     */
+    private final Lock lock = new ReentrantLock();
+
+    /** Signaled after the executor is shutdown and running tasks are done */
+    private final Condition termination = lock.newCondition();
+
+    /*
+     * Conceptually, these two variables describe the executor being in
+     * one of three states:
+     *   - Active: shutdown == false
+     *   - Shutdown: runningTasks > 0 and shutdown == true
+     *   - Terminated: runningTasks == 0 and shutdown == true
+     */
+    private int runningTasks = 0;
+    private boolean shutdown = false;
+
+    @Override
+    public void execute(Runnable command) {
+        startTask();
+        try {
+            command.run();
+        } finally {
+            endTask();
+        }
+    }
+
+    @Override
+    public boolean isShutdown() {
+        lock.lock();
+        try {
+            return shutdown;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        lock.lock();
+        try {
+            shutdown = true;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    // See sameThreadExecutor javadoc for unusual behavior of this method.
+    @Override
+    public List<Runnable> shutdownNow() {
+        shutdown();
+        return Collections.emptyList();
+    }
+
+    @Override
+    public boolean isTerminated() {
+        lock.lock();
+        try {
+            return shutdown && runningTasks == 0;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit)
+            throws InterruptedException {
+        long nanos = unit.toNanos(timeout);
+        lock.lock();
+        try {
+            for (;;) {
+                if (isTerminated()) {
+                    return true;
+                } else if (nanos <= 0) {
+                    return false;
+                } else {
+                    nanos = termination.awaitNanos(nanos);
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Checks if the executor has been shut down and increments the running
+     * task count.
+     *
+     * @throws RejectedExecutionException if the executor has been previously
+     *         shutdown
+     */
+    private void startTask() {
+        lock.lock();
+        try {
+            if (isShutdown()) {
+                throw new RejectedExecutionException("Executor already shutdown");
+            }
+            runningTasks++;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Decrements the running task count.
+     */
+    private void endTask() {
+        lock.lock();
+        try {
+            runningTasks--;
+            if (isTerminated()) {
+                termination.signalAll();
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+}

Modified: jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java?rev=1866727&r1=1866726&r2=1866727&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java (original)
+++ jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java Tue Sep 10 07:27:16 2019
@@ -21,6 +21,8 @@ package org.apache.jackrabbit.oak.plugin
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -31,9 +33,9 @@ import java.util.concurrent.LinkedBlocki
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
 import com.google.common.cache.Weigher;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -44,7 +46,6 @@ import com.google.common.util.concurrent
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.core.data.DataStoreException;
 import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
 import org.apache.jackrabbit.oak.commons.StringUtils;
@@ -237,15 +238,18 @@ public class UploadStagingCache implemen
         // Move any older cache pending uploads
         movePendingUploadsToStaging(home, rootPath, true);
 
-        Iterator<File> iter = Files.fileTreeTraverser().postOrderTraversal(uploadCacheSpace)
-            .filter(new Predicate<File>() {
-                @Override public boolean apply(File input) {
-                    return input.isFile();
-                }
-            }).iterator();
+        List<File> files;
+        try {
+            uploadCacheSpace.mkdirs();
+            files = java.nio.file.Files.find(uploadCacheSpace.toPath(), Integer.MAX_VALUE, (path, basicFileAttributes) -> basicFileAttributes.isRegularFile())
+                    .map(Path::toFile)
+                    .collect(Collectors.toList());
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+
         int count = 0;
-        while (iter.hasNext()) {
-            File toBeSyncedFile = iter.next();
+        for (File toBeSyncedFile : files) {
             Optional<SettableFuture<Integer>> scheduled =
                 putOptionalDisregardingSize(toBeSyncedFile.getName(), toBeSyncedFile, true);
             if (scheduled.isPresent()) {
@@ -403,7 +407,7 @@ public class UploadStagingCache implemen
                     result.setException(t);
                     retryQueue.add(id);
                 }
-            });
+            }, new SameThreadExecutorService());
             LOG.debug("File [{}] scheduled for upload [{}]", upload, result);
         } catch (Exception e) {
             LOG.error("Error staging file for upload [{}]", upload, e);
@@ -411,6 +415,7 @@ public class UploadStagingCache implemen
         return result;
     }
 
+
     /**
      * Invalidate called externally.
      * @param key to invalidate