You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by to...@apache.org on 2019/09/10 07:27:16 UTC
svn commit: r1866727 - in
/jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob:
SameThreadExecutorService.java UploadStagingCache.java
Author: tomekr
Date: Tue Sep 10 07:27:16 2019
New Revision: 1866727
URL: http://svn.apache.org/viewvc?rev=1866727&view=rev
Log:
OAK-8612: Make the Azure Data Store compatible with Guava 15 and 26
Added:
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SameThreadExecutorService.java
Modified:
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
Added: jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SameThreadExecutorService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SameThreadExecutorService.java?rev=1866727&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SameThreadExecutorService.java (added)
+++ jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SameThreadExecutorService.java Tue Sep 10 07:27:16 2019
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import com.google.common.util.concurrent.AbstractListeningExecutorService;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Class copied from the Guava 15, to make the AzureDataStore compatible with
+ * the Guava 26 (where the SameThreadExecutorService is not present).
+ *
+ * TODO: Remove this class once the whole Oak is migrated to use Guava 26.
+ */
+class SameThreadExecutorService extends AbstractListeningExecutorService {
+ /**
+ * Lock used whenever accessing the state variables
+ * (runningTasks, shutdown, terminationCondition) of the executor
+ */
+ private final Lock lock = new ReentrantLock();
+
+ /** Signaled after the executor is shutdown and running tasks are done */
+ private final Condition termination = lock.newCondition();
+
+ /*
+ * Conceptually, these two variables describe the executor being in
+ * one of three states:
+ * - Active: shutdown == false
+ * - Shutdown: runningTasks > 0 and shutdown == true
+ * - Terminated: runningTasks == 0 and shutdown == true
+ */
+ private int runningTasks = 0;
+ private boolean shutdown = false;
+
+ @Override
+ public void execute(Runnable command) {
+ startTask();
+ try {
+ command.run();
+ } finally {
+ endTask();
+ }
+ }
+
+ @Override
+ public boolean isShutdown() {
+ lock.lock();
+ try {
+ return shutdown;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ lock.lock();
+ try {
+ shutdown = true;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ // See sameThreadExecutor javadoc for unusual behavior of this method.
+ @Override
+ public List<Runnable> shutdownNow() {
+ shutdown();
+ return Collections.emptyList();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ lock.lock();
+ try {
+ return shutdown && runningTasks == 0;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ long nanos = unit.toNanos(timeout);
+ lock.lock();
+ try {
+ for (;;) {
+ if (isTerminated()) {
+ return true;
+ } else if (nanos <= 0) {
+ return false;
+ } else {
+ nanos = termination.awaitNanos(nanos);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Checks if the executor has been shut down and increments the running
+ * task count.
+ *
+ * @throws RejectedExecutionException if the executor has been previously
+ * shutdown
+ */
+ private void startTask() {
+ lock.lock();
+ try {
+ if (isShutdown()) {
+ throw new RejectedExecutionException("Executor already shutdown");
+ }
+ runningTasks++;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Decrements the running task count.
+ */
+ private void endTask() {
+ lock.lock();
+ try {
+ runningTasks--;
+ if (isTerminated()) {
+ termination.signalAll();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+}
Modified: jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java?rev=1866727&r1=1866726&r2=1866727&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java (original)
+++ jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java Tue Sep 10 07:27:16 2019
@@ -21,6 +21,8 @@ package org.apache.jackrabbit.oak.plugin
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -31,9 +33,9 @@ import java.util.concurrent.LinkedBlocki
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
import com.google.common.cache.Weigher;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -44,7 +46,6 @@ import com.google.common.util.concurrent
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
-import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
import org.apache.jackrabbit.oak.commons.StringUtils;
@@ -237,15 +238,18 @@ public class UploadStagingCache implemen
// Move any older cache pending uploads
movePendingUploadsToStaging(home, rootPath, true);
- Iterator<File> iter = Files.fileTreeTraverser().postOrderTraversal(uploadCacheSpace)
- .filter(new Predicate<File>() {
- @Override public boolean apply(File input) {
- return input.isFile();
- }
- }).iterator();
+ List<File> files;
+ try {
+ uploadCacheSpace.mkdirs();
+ files = java.nio.file.Files.find(uploadCacheSpace.toPath(), Integer.MAX_VALUE, (path, basicFileAttributes) -> basicFileAttributes.isRegularFile())
+ .map(Path::toFile)
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
int count = 0;
- while (iter.hasNext()) {
- File toBeSyncedFile = iter.next();
+ for (File toBeSyncedFile : files) {
Optional<SettableFuture<Integer>> scheduled =
putOptionalDisregardingSize(toBeSyncedFile.getName(), toBeSyncedFile, true);
if (scheduled.isPresent()) {
@@ -403,7 +407,7 @@ public class UploadStagingCache implemen
result.setException(t);
retryQueue.add(id);
}
- });
+ }, new SameThreadExecutorService());
LOG.debug("File [{}] scheduled for upload [{}]", upload, result);
} catch (Exception e) {
LOG.error("Error staging file for upload [{}]", upload, e);
@@ -411,6 +415,7 @@ public class UploadStagingCache implemen
return result;
}
+
/**
* Invalidate called externally.
* @param key to invalidate