You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2017/09/05 17:20:36 UTC
atlas git commit: ATLAS-2101: Update Implementation to Eliminate Use
of Stopwatch
Repository: atlas
Updated Branches:
refs/heads/master 1dcc3073a -> 7eff37a6f
ATLAS-2101: Update Implementation to Eliminate Use of Stopwatch
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/7eff37a6
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/7eff37a6
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/7eff37a6
Branch: refs/heads/master
Commit: 7eff37a6f4e23f75b4fce75bbc3c77e3258a6c6d
Parents: 1dcc307
Author: ashutoshm <am...@hortonworks.com>
Authored: Tue Sep 5 10:20:27 2017 -0700
Committer: ashutoshm <am...@hortonworks.com>
Committed: Tue Sep 5 10:20:27 2017 -0700
----------------------------------------------------------------------
.../database/idassigner/StandardIDPool.java | 259 +++++++++++++++++++
1 file changed, 259 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/7eff37a6/graphdb/titan0/src/main/java/com/thinkaurelius/titan/graphdb/database/idassigner/StandardIDPool.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/graphdb/database/idassigner/StandardIDPool.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/graphdb/database/idassigner/StandardIDPool.java
new file mode 100644
index 0000000..6c7a086
--- /dev/null
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/graphdb/database/idassigner/StandardIDPool.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.thinkaurelius.titan.graphdb.database.idassigner;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.thinkaurelius.titan.core.TitanException;
+import com.thinkaurelius.titan.core.attribute.Duration;
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.IDAuthority;
+import com.thinkaurelius.titan.diskstorage.IDBlock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+
+public class StandardIDPool implements IDPool {
+
+ private static final Logger log =
+ LoggerFactory.getLogger(StandardIDPool.class);
+
+ private static final TimeUnit SCHEDULING_TIME_UNIT =
+ TimeUnit.MILLISECONDS; // TODO
+
+ private static final IDBlock ID_POOL_EXHAUSTION = new IDBlock() {
+ @Override
+ public long numIds() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getId(long index) {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ private static final IDBlock UNINITIALIZED_BLOCK = new IDBlock() {
+ @Override
+ public long numIds() {
+ return 0;
+ }
+
+ @Override
+ public long getId(long index) {
+ throw new ArrayIndexOutOfBoundsException(0);
+ }
+ };
+
+ private static final int RENEW_ID_COUNT = 100;
+
+ private final IDAuthority idAuthority;
+ private final long idUpperBound; //exclusive
+ private final int partition;
+ private final int idNamespace;
+
+ private final Duration renewTimeout;
+ private final double renewBufferPercentage;
+
+ private IDBlock currentBlock;
+ private long currentIndex;
+ private long renewBlockIndex;
+// private long nextID;
+// private long currentMaxID;
+// private long renewBufferID;
+
+ private volatile IDBlock nextBlock;
+ private Future<?> idBlockFuture;
+ private final ThreadPoolExecutor exec;
+
+ private volatile boolean initialized;
+ private volatile boolean closed;
+
+ public StandardIDPool(IDAuthority idAuthority, int partition, int idNamespace, long idUpperBound, Duration renewTimeout, double renewBufferPercentage) {
+ Preconditions.checkArgument(idUpperBound > 0);
+ this.idAuthority = idAuthority;
+ Preconditions.checkArgument(partition>=0);
+ this.partition = partition;
+ Preconditions.checkArgument(idNamespace>=0);
+ this.idNamespace = idNamespace;
+ this.idUpperBound = idUpperBound;
+ Preconditions.checkArgument(!renewTimeout.isZeroLength(), "Renew-timeout must be positive");
+ this.renewTimeout = renewTimeout;
+ Preconditions.checkArgument(renewBufferPercentage>0.0 && renewBufferPercentage<=1.0,"Renew-buffer percentage must be in (0.0,1.0]");
+ this.renewBufferPercentage = renewBufferPercentage;
+
+ currentBlock = UNINITIALIZED_BLOCK;
+ currentIndex = 0;
+ renewBlockIndex = 0;
+
+ nextBlock = null;
+
+ // daemon=true would probably be fine too
+ exec = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder()
+ .setDaemon(false)
+ .setNameFormat("TitanID(" + partition + ")("+idNamespace+")[%d]")
+ .build());
+ //exec.allowCoreThreadTimeOut(false);
+ //exec.prestartCoreThread();
+ idBlockFuture = null;
+
+ initialized = false;
+ closed = false;
+ }
+
+ private void waitForIDRenewer() throws InterruptedException {
+
+ long start = swStart();
+ if (null != idBlockFuture) {
+ try {
+ idBlockFuture.get(renewTimeout.getLength(SCHEDULING_TIME_UNIT), SCHEDULING_TIME_UNIT);
+ } catch (ExecutionException e) {
+ String msg = String.format("ID block allocation on partition(%d)-namespace(%d) failed with an exception in %s",
+ partition, idNamespace, swStop(start));
+ throw new TitanException(msg, e);
+ } catch (TimeoutException e) {
+ // Attempt to cancel the renewer
+ idBlockFuture.cancel(true);
+ String msg = String.format("ID block allocation on partition(%d)-namespace(%d) timed out in %s",
+ partition, idNamespace, swStop(start));
+ throw new TitanException(msg, e);
+ } catch (CancellationException e) {
+ String msg = String.format("ID block allocation on partition(%d)-namespace(%d) was cancelled after %s",
+ partition, idNamespace, swStop(start));
+ throw new TitanException(msg, e);
+ } finally {
+ idBlockFuture = null;
+ }
+ // Allow InterruptedException to propagate up the stack
+ }
+ }
+
+ private long swStop(long start) {
+ return swStart() - start;
+ }
+
+ private synchronized void nextBlock() throws InterruptedException {
+ assert currentIndex == currentBlock.numIds();
+ Preconditions.checkState(!closed,"ID Pool has been closed for partition(%s)-namespace(%s) - cannot apply for new id block",
+ partition,idNamespace);
+
+ waitForIDRenewer();
+ if (nextBlock == ID_POOL_EXHAUSTION)
+ throw new IDPoolExhaustedException("Exhausted ID Pool for partition(" + partition+")-namespace("+idNamespace+")");
+
+ Preconditions.checkArgument(nextBlock!=null);
+
+ currentBlock = nextBlock;
+ currentIndex = 0;
+
+ log.debug("ID partition({})-namespace({}) acquired block: [{}]", partition, idNamespace, currentBlock);
+
+ assert currentBlock.numIds()>0;
+
+ nextBlock = null;
+
+ assert RENEW_ID_COUNT>0;
+ renewBlockIndex = Math.max(0,currentBlock.numIds()-Math.max(RENEW_ID_COUNT, Math.round(currentBlock.numIds()*renewBufferPercentage)));
+ assert renewBlockIndex<currentBlock.numIds() && renewBlockIndex>=currentIndex;
+ }
+
+ private void renewBuffer() {
+ Preconditions.checkArgument(nextBlock == null, nextBlock);
+ try {
+ long start = swStart();
+ IDBlock idBlock = idAuthority.getIDBlock(partition, idNamespace, renewTimeout);
+ log.debug("Retrieved ID block from authority on partition({})-namespace({}) in {}", partition, idNamespace, swStop(start));
+ Preconditions.checkArgument(idBlock!=null && idBlock.numIds()>0);
+ nextBlock = idBlock;
+ } catch (BackendException e) {
+ throw new TitanException("Could not acquire new ID block from storage", e);
+ } catch (IDPoolExhaustedException e) {
+ nextBlock = ID_POOL_EXHAUSTION;
+ }
+ }
+
+ private long swStart() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public synchronized long nextID() {
+ assert currentIndex <= currentBlock.numIds();
+ if (!initialized) {
+ startNextIDAcquisition();
+ initialized = true;
+ }
+
+ if (currentIndex == currentBlock.numIds()) {
+ try {
+ nextBlock();
+ } catch (InterruptedException e) {
+ throw new TitanException("Could not renew id block due to interruption", e);
+ }
+ }
+
+ if (currentIndex == renewBlockIndex) {
+ startNextIDAcquisition();
+ }
+ long returnId = currentBlock.getId(currentIndex);
+ currentIndex++;
+ if (returnId >= idUpperBound) throw new IDPoolExhaustedException("Reached id upper bound of " + idUpperBound);
+ log.trace("partition({})-namespace({}) Returned id: {}", partition, idNamespace, returnId);
+ return returnId;
+ }
+
+ @Override
+ public synchronized void close() {
+ closed=true;
+ //Wait for renewer to finish -- call exec.shutdownNow() instead?
+ try {
+ waitForIDRenewer();
+ } catch (InterruptedException e) {
+ throw new TitanException("Interrupted while waiting for id renewer thread to finish", e);
+ }
+ exec.shutdownNow();
+ }
+
+ private void startNextIDAcquisition() {
+ Preconditions.checkArgument(idBlockFuture == null, idBlockFuture);
+ if (closed) return; //Don't renew anymore if closed
+ //Renew buffer
+ log.debug("Starting id block renewal thread upon {}", currentIndex);
+ idBlockFuture = exec.submit(new IDBlockRunnable());
+ }
+
+ private class IDBlockRunnable implements Runnable {
+ @Override
+ public void run() {
+ renewBuffer();
+ }
+ }
+}