You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2017/09/05 17:20:36 UTC

atlas git commit: ATLAS-2101: Update Implementation to Eliminate Use of Stopwatch

Repository: atlas
Updated Branches:
  refs/heads/master 1dcc3073a -> 7eff37a6f


ATLAS-2101: Update Implementation to Eliminate Use of Stopwatch


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/7eff37a6
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/7eff37a6
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/7eff37a6

Branch: refs/heads/master
Commit: 7eff37a6f4e23f75b4fce75bbc3c77e3258a6c6d
Parents: 1dcc307
Author: ashutoshm <am...@hortonworks.com>
Authored: Tue Sep 5 10:20:27 2017 -0700
Committer: ashutoshm <am...@hortonworks.com>
Committed: Tue Sep 5 10:20:27 2017 -0700

----------------------------------------------------------------------
 .../database/idassigner/StandardIDPool.java     | 259 +++++++++++++++++++
 1 file changed, 259 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/7eff37a6/graphdb/titan0/src/main/java/com/thinkaurelius/titan/graphdb/database/idassigner/StandardIDPool.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/graphdb/database/idassigner/StandardIDPool.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/graphdb/database/idassigner/StandardIDPool.java
new file mode 100644
index 0000000..6c7a086
--- /dev/null
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/graphdb/database/idassigner/StandardIDPool.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.thinkaurelius.titan.graphdb.database.idassigner;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.thinkaurelius.titan.core.TitanException;
+import com.thinkaurelius.titan.core.attribute.Duration;
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.IDAuthority;
+import com.thinkaurelius.titan.diskstorage.IDBlock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+
+public class StandardIDPool implements IDPool {
+
+    private static final Logger log =
+            LoggerFactory.getLogger(StandardIDPool.class);
+
+    private static final TimeUnit SCHEDULING_TIME_UNIT =
+            TimeUnit.MILLISECONDS; // TODO
+
+    private static final IDBlock ID_POOL_EXHAUSTION = new IDBlock() {
+        @Override
+        public long numIds() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public long getId(long index) {
+            throw new UnsupportedOperationException();
+        }
+    };
+
+    private static final IDBlock UNINITIALIZED_BLOCK = new IDBlock() {
+        @Override
+        public long numIds() {
+            return 0;
+        }
+
+        @Override
+        public long getId(long index) {
+            throw new ArrayIndexOutOfBoundsException(0);
+        }
+    };
+
+    private static final int RENEW_ID_COUNT = 100;
+
+    private final IDAuthority idAuthority;
+    private final long idUpperBound; //exclusive
+    private final int partition;
+    private final int idNamespace;
+
+    private final Duration renewTimeout;
+    private final double renewBufferPercentage;
+
+    private IDBlock currentBlock;
+    private long currentIndex;
+    private long renewBlockIndex;
+//    private long nextID;
+//    private long currentMaxID;
+//    private long renewBufferID;
+
+    private volatile IDBlock nextBlock;
+    private Future<?> idBlockFuture;
+    private final ThreadPoolExecutor exec;
+
+    private volatile boolean initialized;
+    private volatile boolean closed;
+
+    public StandardIDPool(IDAuthority idAuthority, int partition, int idNamespace, long idUpperBound, Duration renewTimeout, double renewBufferPercentage) {
+        Preconditions.checkArgument(idUpperBound > 0);
+        this.idAuthority = idAuthority;
+        Preconditions.checkArgument(partition>=0);
+        this.partition = partition;
+        Preconditions.checkArgument(idNamespace>=0);
+        this.idNamespace = idNamespace;
+        this.idUpperBound = idUpperBound;
+        Preconditions.checkArgument(!renewTimeout.isZeroLength(), "Renew-timeout must be positive");
+        this.renewTimeout = renewTimeout;
+        Preconditions.checkArgument(renewBufferPercentage>0.0 && renewBufferPercentage<=1.0,"Renew-buffer percentage must be in (0.0,1.0]");
+        this.renewBufferPercentage = renewBufferPercentage;
+
+        currentBlock = UNINITIALIZED_BLOCK;
+        currentIndex = 0;
+        renewBlockIndex = 0;
+
+        nextBlock = null;
+
+        // daemon=true would probably be fine too
+        exec = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
+                new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder()
+                .setDaemon(false)
+                .setNameFormat("TitanID(" + partition + ")("+idNamespace+")[%d]")
+                .build());
+        //exec.allowCoreThreadTimeOut(false);
+        //exec.prestartCoreThread();
+        idBlockFuture = null;
+
+        initialized = false;
+        closed = false;
+    }
+
+    private void waitForIDRenewer() throws InterruptedException {
+
+        long start = swStart();
+        if (null != idBlockFuture) {
+            try {
+                idBlockFuture.get(renewTimeout.getLength(SCHEDULING_TIME_UNIT), SCHEDULING_TIME_UNIT);
+            } catch (ExecutionException e) {
+                String msg = String.format("ID block allocation on partition(%d)-namespace(%d) failed with an exception in %s",
+                        partition, idNamespace, swStop(start));
+                throw new TitanException(msg, e);
+            } catch (TimeoutException e) {
+                // Attempt to cancel the renewer
+                idBlockFuture.cancel(true);
+                String msg = String.format("ID block allocation on partition(%d)-namespace(%d) timed out in %s",
+                        partition, idNamespace, swStop(start));
+                throw new TitanException(msg, e);
+            } catch (CancellationException e) {
+                String msg = String.format("ID block allocation on partition(%d)-namespace(%d) was cancelled after %s",
+                        partition, idNamespace, swStop(start));
+                throw new TitanException(msg, e);
+            } finally {
+                idBlockFuture = null;
+            }
+            // Allow InterruptedException to propagate up the stack
+        }
+    }
+
+    private long swStop(long start) {
+        return swStart() - start;
+    }
+
+    private synchronized void nextBlock() throws InterruptedException {
+        assert currentIndex == currentBlock.numIds();
+        Preconditions.checkState(!closed,"ID Pool has been closed for partition(%s)-namespace(%s) - cannot apply for new id block",
+                partition,idNamespace);
+
+        waitForIDRenewer();
+        if (nextBlock == ID_POOL_EXHAUSTION)
+            throw new IDPoolExhaustedException("Exhausted ID Pool for partition(" + partition+")-namespace("+idNamespace+")");
+
+        Preconditions.checkArgument(nextBlock!=null);
+
+        currentBlock = nextBlock;
+        currentIndex = 0;
+
+        log.debug("ID partition({})-namespace({}) acquired block: [{}]", partition, idNamespace, currentBlock);
+
+        assert currentBlock.numIds()>0;
+
+        nextBlock = null;
+
+        assert RENEW_ID_COUNT>0;
+        renewBlockIndex = Math.max(0,currentBlock.numIds()-Math.max(RENEW_ID_COUNT, Math.round(currentBlock.numIds()*renewBufferPercentage)));
+        assert renewBlockIndex<currentBlock.numIds() && renewBlockIndex>=currentIndex;
+    }
+
+    private void renewBuffer() {
+        Preconditions.checkArgument(nextBlock == null, nextBlock);
+        try {
+            long start = swStart();
+            IDBlock idBlock = idAuthority.getIDBlock(partition, idNamespace, renewTimeout);
+            log.debug("Retrieved ID block from authority on partition({})-namespace({}) in {}", partition, idNamespace, swStop(start));
+            Preconditions.checkArgument(idBlock!=null && idBlock.numIds()>0);
+            nextBlock = idBlock;
+        } catch (BackendException e) {
+            throw new TitanException("Could not acquire new ID block from storage", e);
+        } catch (IDPoolExhaustedException e) {
+            nextBlock = ID_POOL_EXHAUSTION;
+        }
+    }
+
+    private long swStart() {
+        return System.currentTimeMillis();
+    }
+
+    @Override
+    public synchronized long nextID() {
+        assert currentIndex <= currentBlock.numIds();
+        if (!initialized) {
+            startNextIDAcquisition();
+            initialized = true;
+        }
+
+        if (currentIndex == currentBlock.numIds()) {
+            try {
+                nextBlock();
+            } catch (InterruptedException e) {
+                throw new TitanException("Could not renew id block due to interruption", e);
+            }
+        }
+
+        if (currentIndex == renewBlockIndex) {
+            startNextIDAcquisition();
+        }
+        long returnId = currentBlock.getId(currentIndex);
+        currentIndex++;
+        if (returnId >= idUpperBound) throw new IDPoolExhaustedException("Reached id upper bound of " + idUpperBound);
+        log.trace("partition({})-namespace({}) Returned id: {}", partition, idNamespace, returnId);
+        return returnId;
+    }
+
+    @Override
+    public synchronized void close() {
+        closed=true;
+        //Wait for renewer to finish -- call exec.shutdownNow() instead?
+        try {
+            waitForIDRenewer();
+        } catch (InterruptedException e) {
+            throw new TitanException("Interrupted while waiting for id renewer thread to finish", e);
+        }
+        exec.shutdownNow();
+    }
+
+    private void startNextIDAcquisition() {
+        Preconditions.checkArgument(idBlockFuture == null, idBlockFuture);
+        if (closed) return; //Don't renew anymore if closed
+        //Renew buffer
+        log.debug("Starting id block renewal thread upon {}", currentIndex);
+        idBlockFuture = exec.submit(new IDBlockRunnable());
+    }
+
+    private class IDBlockRunnable implements Runnable {
+        @Override
+        public void run() {
+            renewBuffer();
+        }
+    }
+}