You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ki...@apache.org on 2016/05/06 16:23:11 UTC
[2/6] incubator-asterixdb git commit: Deadlock-free locking protocol
is enabled
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
deleted file mode 100644
index 7ccd6b9..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
+++ /dev/null
@@ -1,664 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.locking;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.NoSuchElementException;
-import java.util.Scanner;
-
-import org.apache.commons.io.FileUtils;
-
-import org.apache.asterix.common.config.AsterixPropertiesAccessor;
-import org.apache.asterix.common.config.AsterixTransactionProperties;
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.transactions.DatasetId;
-import org.apache.asterix.common.transactions.ILockManager;
-import org.apache.asterix.common.transactions.ITransactionManager;
-import org.apache.asterix.common.transactions.JobId;
-import org.apache.asterix.transaction.management.service.logging.LogManager;
-import org.apache.asterix.transaction.management.service.transaction.TransactionContext;
-import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
-import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
-
-public class LockManagerDeterministicUnitTest {
-
- public static void main(String args[]) throws ACIDException, IOException, AsterixException {
- //prepare configuration file
- File cwd = new File(System.getProperty("user.dir"));
- File asterixdbDir = cwd.getParentFile();
- File srcFile = new File(asterixdbDir.getAbsoluteFile(),
- "asterix-app/src/main/resources/asterix-build-configuration.xml");
- File destFile = new File(cwd, "target/classes/asterix-configuration.xml");
- FileUtils.copyFile(srcFile, destFile);
-
- //initialize controller thread
- String requestFileName = new String(
- "src/main/java/org.apache/asterix/transaction/management/service/locking/LockRequestFile");
- Thread t = new Thread(new LockRequestController(requestFileName));
- t.start();
- }
-}
-
-class LockRequestController implements Runnable {
-
- public static final boolean IS_DEBUG_MODE = false;
- TransactionSubsystem txnProvider;
- WorkerReadyQueue workerReadyQueue;
- ArrayList<LockRequest> requestList;
- ArrayList<ArrayList<Integer>> expectedResultList;
- int resultListIndex;
- ILockManager lockMgr;
- String requestFileName;
- long defaultWaitTime;
-
- public LockRequestController(String requestFileName) throws ACIDException, AsterixException {
- this.txnProvider = new TransactionSubsystem("nc1", new TestRuntimeContextProvider(), new AsterixTransactionProperties(
- new AsterixPropertiesAccessor()));
- this.workerReadyQueue = new WorkerReadyQueue();
- this.requestList = new ArrayList<LockRequest>();
- this.expectedResultList = new ArrayList<ArrayList<Integer>>();
- this.lockMgr = txnProvider.getLockManager();
- this.requestFileName = new String(requestFileName);
- this.resultListIndex = 0;
- this.defaultWaitTime = 10;
- }
-
- @Override
- public void run() {
- Thread.currentThread().setName("Thread-0");
- HashMap<String, Thread> threadMap = new HashMap<String, Thread>();
- Thread t = null;
- LockRequest lockRequest = null;
- boolean isSuccess = true;
-
- try {
- readRequest();
- } catch (IOException e) {
- e.printStackTrace();
- System.exit(-1);
- } catch (ACIDException e) {
- e.printStackTrace();
- System.exit(-1);
- }
-
- //initialize workerThread
- int size = requestList.size();
- for (int i = 0; i < size; i++) {
- lockRequest = requestList.get(i);
- if (lockRequest.threadName.equals("Thread-0")) {
- //Thread-0 is controller thread.
- continue;
- }
- t = threadMap.get(lockRequest.threadName);
- if (t == null) {
- t = new Thread(new LockRequestWorker(txnProvider, workerReadyQueue, lockRequest.threadName),
- lockRequest.threadName);
- threadMap.put(lockRequest.threadName, t);
- t.start();
- log("Created " + lockRequest.threadName);
- }
- }
-
- //wait for all workerThreads to be ready
- try {
- log("waiting for all workerThreads to complete initialization ...");
- Thread.sleep(5);
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
- while (workerReadyQueue.size() != threadMap.size()) {
- try {
- log(" .");
- Thread.sleep(5);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- //make workerThread work
- while (requestList.size() != 0) {
- lockRequest = requestList.remove(0);
- log("Processing: " + lockRequest.prettyPrint());
- try {
- if (!handleRequest(lockRequest)) {
- log("\n*** Test Failed ***");
- isSuccess = false;
- break;
- } else {
- log("Processed: " + lockRequest.prettyPrint());
- }
- } catch (ACIDException e) {
- e.printStackTrace();
- break;
- }
- }
-
- if (isSuccess) {
- log("\n*** Test Passed ***");
- }
- ((LogManager) txnProvider.getLogManager()).stop(false, null);
- }
-
- public boolean handleRequest(LockRequest request) throws ACIDException {
- LockRequestWorker worker = null;
- int i = 0;
-
- if (request.requestType == RequestType.CHECK_SEQUENCE) {
- return validateExpectedResult(true);
- } else if (request.requestType == RequestType.CHECK_SET) {
- return validateExpectedResult(false);
- } else if (request.requestType == RequestType.WAIT) {
- try {
- Thread.sleep((long) request.entityHashValue);
- } catch (InterruptedException e) {
- e.printStackTrace();
- return false;
- }
- } else if (request.requestType == RequestType.END) {
- worker = workerReadyQueue.pop(request.threadName);
- while (worker == null) {
- if (!IS_DEBUG_MODE) {
- log(request.threadName + " is not in the workerReadyQueue");
- return false;
- }
- log(Thread.currentThread().getName() + " waiting for " + request.threadName
- + " to be in the workerReadyQueue[" + i++ + "].");
- try {
- Thread.sleep((long) 10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- return false;
- }
- worker = workerReadyQueue.pop(request.threadName);
- }
- synchronized (worker) {
- worker.setDone(true);
- worker.setWait(false);
- worker.notify();
- }
- try {
- Thread.sleep((long) defaultWaitTime);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- } else {
- worker = workerReadyQueue.pop(request.threadName);
- while (worker == null) {
- if (!IS_DEBUG_MODE) {
- log(request.threadName + " is not in the workerReadyQueue");
- return false;
- }
- log(Thread.currentThread().getName() + " waiting for " + request.threadName
- + " to be in the workerReadyQueue[" + i++ + "].");
- try {
- Thread.sleep((long) 10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- worker = workerReadyQueue.pop(request.threadName);
- }
-
- synchronized (worker) {
- worker.setLockRequest(request);
- worker.setWait(false);
- worker.notify();
- }
-
- try {
- Thread.sleep((long) defaultWaitTime);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- return true;
- }
-
- public boolean validateExpectedResult(boolean isSequence) {
-
- if (isSequence) {
- return workerReadyQueue.checkSequence(expectedResultList.get(resultListIndex++));
- } else {
- return workerReadyQueue.checkSet(expectedResultList.get(resultListIndex++));
- }
-
- }
-
- public void readRequest() throws IOException, ACIDException {
- int i = 0;
- LockRequest lockRequest = null;
- TransactionContext txnContext = null;
- HashMap<Integer, TransactionContext> jobMap = new HashMap<Integer, TransactionContext>();
-
- int threadId;
- String requestType;
- int jobId;
- int datasetId;
- int PKHashVal;
- int waitTime;
- ArrayList<Integer> list = null;
- String lockMode;
-
- Scanner scanner = new Scanner(new FileInputStream(requestFileName));
- while (scanner.hasNextLine()) {
- try {
- threadId = Integer.parseInt(scanner.next().substring(1));
- requestType = scanner.next();
- if (requestType.equals("CSQ") || requestType.equals("CST") || requestType.equals("END")) {
- log("LockRequest[" + i++ + "]:T" + threadId + "," + requestType);
- lockRequest = new LockRequest("Thread-" + threadId, getRequestType(requestType));
- if (requestType.equals("CSQ") || requestType.equals("CST")) {
- list = new ArrayList<Integer>();
- while (scanner.hasNextInt()) {
- threadId = scanner.nextInt();
- if (threadId < 0) {
- break;
- }
- list.add(threadId);
- }
- expectedResultList.add(list);
- }
- } else if (requestType.equals("DW")) {
- defaultWaitTime = scanner.nextInt();
- log("LockRequest[" + i++ + "]:T" + threadId + "," + requestType + "," + defaultWaitTime);
- continue;
- } else if (requestType.equals("W")) {
- waitTime = scanner.nextInt();
- log("LockRequest[" + i++ + "]:T" + threadId + "," + requestType);
- lockRequest = new LockRequest("Thread-" + threadId, getRequestType(requestType), waitTime);
- } else {
- jobId = Integer.parseInt(scanner.next().substring(1));
- datasetId = Integer.parseInt(scanner.next().substring(1));
- PKHashVal = Integer.parseInt(scanner.next().substring(1));
- lockMode = scanner.next();
- txnContext = jobMap.get(jobId);
- if (txnContext == null) {
- txnContext = new TransactionContext(new JobId(jobId), txnProvider);
- jobMap.put(jobId, txnContext);
- }
- log("LockRequest[" + i++ + "]:T" + threadId + "," + requestType + ",J" + jobId + ",D" + datasetId
- + ",E" + PKHashVal + "," + lockMode);
- lockRequest = new LockRequest("Thread-" + threadId, getRequestType(requestType), new DatasetId(
- datasetId), PKHashVal, getLockMode(lockMode), txnContext);
- }
-
- requestList.add(lockRequest);
- } catch (NoSuchElementException e) {
- scanner.close();
- break;
- }
- }
- }
-
- public void log(String s) {
- System.out.println(s);
- }
-
- private int getRequestType(String s) {
- if (s.equals("L")) {
- return RequestType.LOCK;
- }
-
- if (s.equals("TL")) {
- return RequestType.TRY_LOCK;
- }
-
- if (s.equals("IL")) {
- return RequestType.INSTANT_LOCK;
- }
-
- if (s.equals("ITL")) {
- return RequestType.INSTANT_TRY_LOCK;
- }
-
- if (s.equals("UL")) {
- return RequestType.UNLOCK;
- }
-
- if (s.equals("RL")) {
- return RequestType.RELEASE_LOCKS;
- }
-
- if (s.equals("CSQ")) {
- return RequestType.CHECK_SEQUENCE;
- }
-
- if (s.equals("CST")) {
- return RequestType.CHECK_SET;
- }
-
- if (s.equals("END")) {
- return RequestType.END;
- }
-
- if (s.equals("W")) {
- return RequestType.WAIT;
- }
-
- try {
- throw new UnsupportedOperationException("Invalid request type:" + s);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- System.exit(0);
- }
-
- return -1;
-
- }
-
- private byte getLockMode(String s) {
- if (s.equals("S")) {
- return LockMode.S;
- }
-
- if (s.equals("X")) {
- return LockMode.X;
- }
-
- try {
- throw new UnsupportedOperationException("Invalid lock mode type:" + s);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- System.exit(0);
- }
-
- return -1;
- }
-}
-
-class LockRequestWorker implements Runnable {
-
- String threadName;
- ILockManager lockMgr;
- WorkerReadyQueue workerReadyQueue;
- LockRequest lockRequest;
- boolean needWait;
- boolean isAwaken;
- boolean isDone;
-
- public LockRequestWorker(TransactionSubsystem txnProvider, WorkerReadyQueue workerReadyQueue, String threadName) {
- this.lockMgr = txnProvider.getLockManager();
- this.workerReadyQueue = workerReadyQueue;
- this.threadName = new String(threadName);
- this.lockRequest = null;
- needWait = true;
- isDone = false;
- isAwaken = false;
- }
-
- public boolean isAwaken() {
- return isAwaken;
- }
-
- @Override
- public void run() {
- //initial wait
- needWait = true;
- isAwaken = false;
-
- while (!isDone) {
- while (needWait) {
- synchronized (this) {
- workerReadyQueue.push(this);
- try {
- this.wait();
- isAwaken = true;
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- if (isDone) {
- break;
- }
-
- try {
- sendRequest(lockRequest);
- } catch (ACIDException e) {
- if (lockRequest.txnContext.isTimeout()) {
- if (lockRequest.txnContext.getTxnState() != ITransactionManager.ABORTED) {
- lockRequest.txnContext.setTxnState(ITransactionManager.ABORTED);
- log("*** " + lockRequest.txnContext.getJobId() + " lock request causing deadlock ***");
- log("Abort --> Releasing all locks acquired by " + lockRequest.txnContext.getJobId());
- try {
- lockMgr.releaseLocks(lockRequest.txnContext);
- } catch (ACIDException e1) {
- e1.printStackTrace();
- }
- log("Abort --> Released all locks acquired by " + lockRequest.txnContext.getJobId());
- }
- isDone = true;
- } else {
- e.printStackTrace();
- System.exit(-1);
- }
- }
-
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- needWait = true;
- isAwaken = false;
- }
- }
-
- public void sendRequest(LockRequest request) throws ACIDException {
-
- switch (request.requestType) {
- case RequestType.LOCK:
- lockMgr.lock(request.datasetIdObj, request.entityHashValue, request.lockMode, request.txnContext);
- break;
- case RequestType.INSTANT_LOCK:
- lockMgr.instantLock(request.datasetIdObj, request.entityHashValue, request.lockMode, request.txnContext);
- break;
- case RequestType.TRY_LOCK:
- request.isTryLockFailed = !lockMgr.tryLock(request.datasetIdObj, request.entityHashValue,
- request.lockMode, request.txnContext);
- break;
- case RequestType.INSTANT_TRY_LOCK:
- lockMgr.instantTryLock(request.datasetIdObj, request.entityHashValue, request.lockMode,
- request.txnContext);
- break;
- case RequestType.UNLOCK:
- lockMgr.unlock(request.datasetIdObj, request.entityHashValue, request.lockMode, request.txnContext);
- break;
- case RequestType.RELEASE_LOCKS:
- lockMgr.releaseLocks(request.txnContext);
- break;
- default:
- throw new UnsupportedOperationException("Unsupported lock method");
- }
- }
-
- public void setLockRequest(LockRequest request) {
- this.lockRequest = request;
- }
-
- public void setWait(boolean wait) {
- needWait = wait;
- }
-
- public void setDone(boolean done) {
- isDone = done;
- }
-
- public String getThreadName() {
- return threadName;
- }
-
- public void log(String s) {
- System.out.println(s);
- }
-
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("{ t : \"").append(threadName).append("\", r : ");
- if (lockRequest == null) {
- sb.append("null");
- } else {
- sb.append("\"").append(lockRequest.toString()).append("\"");
- }
- sb.append(" }");
- return sb.toString();
- }
-}
-
-class WorkerReadyQueue {
- ArrayList<LockRequestWorker> workerReadyQueue;
-
- public WorkerReadyQueue() {
- workerReadyQueue = new ArrayList<LockRequestWorker>();
- }
-
- public synchronized void push(LockRequestWorker worker) {
- workerReadyQueue.add(worker);
- }
-
- public synchronized LockRequestWorker pop(String threadName) {
- int i;
- LockRequestWorker worker = null;
- int size = workerReadyQueue.size();
- for (i = 0; i < size; i++) {
- worker = workerReadyQueue.get(i);
- if (worker.getThreadName().equals(threadName)) {
- workerReadyQueue.remove(i);
- break;
- }
- }
-
- if (i == size) {
- return null;
- } else {
- return worker;
- }
- }
-
- public synchronized int size() {
- return workerReadyQueue.size();
- }
-
- public boolean checkSet(ArrayList<Integer> threadIdList) {
- int i;
- int j;
- StringBuilder s = new StringBuilder();
- LockRequestWorker worker = null;
- int resultListSize = 0;
- int queueSize = workerReadyQueue.size();
- int listSize = threadIdList.size();
-
- s.append("ExpectedList(Set):\t");
- for (i = 0; i < listSize; i++) {
- s.append(threadIdList.get(i)).append(" ");
- }
- s.append("\n");
-
- while (queueSize < listSize) {
- //wait until workers finish its task
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- log(Thread.currentThread().getName() + " waiting for worker to finish its task...");
- queueSize = workerReadyQueue.size();
- }
-
- if (listSize != queueSize) {
- log("listSize:" + listSize + ", queueSize:" + queueSize);
- return false;
- }
-
- s.append("ResultList(Set):\t");
- for (i = 0; i < listSize; i++) {
- for (j = 0; j < queueSize; j++) {
- worker = workerReadyQueue.get(j);
- if (worker.getThreadName().equals("Thread-" + threadIdList.get(i))) {
- s.append(threadIdList.get(i)).append(" ");
- resultListSize++;
- break;
- }
- }
- }
-
- log(s.toString());
- if (listSize != resultListSize) {
- return false;
- }
-
- return true;
- }
-
- public boolean checkSequence(ArrayList<Integer> threadIdList) {
- int i;
- StringBuilder s = new StringBuilder();
- LockRequestWorker worker = null;
- int queueSize = workerReadyQueue.size();
- int listSize = threadIdList.size();
-
- s.append("ExpectedList(Sequence):\t");
- for (i = 0; i < listSize; i++) {
- s.append(threadIdList.get(i)).append(" ");
- }
- s.append("\n");
-
- while (queueSize < listSize) {
- //wait until workers finish its task
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- log(Thread.currentThread().getName() + " Waiting for worker to finish its task...");
- queueSize = workerReadyQueue.size();
- }
-
- if (queueSize != listSize) {
- return false;
- }
-
- s.append("ResultList(Sequence):\t");
- for (i = 0; i < listSize; i++) {
- worker = workerReadyQueue.get(i);
- if (!worker.getThreadName().equals("Thread-" + threadIdList.get(i))) {
- log(s.toString());
- return false;
- } else {
- s.append(threadIdList.get(i)).append(" ");
- }
- }
-
- log(s.toString());
- return true;
- }
-
- public void log(String s) {
- System.out.println(s);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
deleted file mode 100644
index eab8f46..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
+++ /dev/null
@@ -1,636 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.locking;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Random;
-
-import org.apache.asterix.common.config.AsterixPropertiesAccessor;
-import org.apache.asterix.common.config.AsterixTransactionProperties;
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.transactions.DatasetId;
-import org.apache.asterix.common.transactions.ILockManager;
-import org.apache.asterix.common.transactions.ITransactionContext;
-import org.apache.asterix.common.transactions.ITransactionManager;
-import org.apache.asterix.common.transactions.JobId;
-import org.apache.asterix.transaction.management.service.logging.LogManager;
-import org.apache.asterix.transaction.management.service.transaction.TransactionContext;
-import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
-import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
-import org.apache.commons.io.FileUtils;
-
-/**
- * LockManagerUnitTest: unit test of LockManager
- *
- * @author kisskys
- */
-
-public class LockManagerRandomUnitTest {
-
- private static final int MAX_NUM_OF_UPGRADE_JOB = 2;//2
- private static final int MAX_NUM_OF_ENTITY_LOCK_JOB = 8;//8
- private static final int MAX_NUM_OF_DATASET_LOCK_JOB = 2;//2
- private static final int MAX_NUM_OF_THREAD_IN_A_JOB = 2; //4
- private static int jobId = 0;
- private static Random rand;
-
- public static void main(String args[]) throws ACIDException, AsterixException, IOException {
- int i;
- //prepare configuration file
- File cwd = new File(System.getProperty("user.dir"));
- File asterixdbDir = cwd.getParentFile();
- File srcFile = new File(asterixdbDir.getAbsoluteFile(),
- "asterix-app/src/main/resources/asterix-build-configuration.xml");
- File destFile = new File(cwd, "target/classes/asterix-configuration.xml");
- FileUtils.copyFile(srcFile, destFile);
-
- TransactionSubsystem txnProvider = new TransactionSubsystem("nc1", null,
- new AsterixTransactionProperties(new AsterixPropertiesAccessor()));
- rand = new Random(System.currentTimeMillis());
- for (i = 0; i < MAX_NUM_OF_ENTITY_LOCK_JOB; i++) {
- System.out.println("Creating " + i + "th EntityLockJob..");
- generateEntityLockThread(txnProvider);
- }
-
- for (i = 0; i < MAX_NUM_OF_DATASET_LOCK_JOB; i++) {
- System.out.println("Creating " + i + "th DatasetLockJob..");
- generateDatasetLockThread(txnProvider);
- }
-
- for (i = 0; i < MAX_NUM_OF_UPGRADE_JOB; i++) {
- System.out.println("Creating " + i + "th EntityLockUpgradeJob..");
- generateEntityLockUpgradeThread(txnProvider);
- }
- ((LogManager) txnProvider.getLogManager()).stop(false, null);
- }
-
- private static void generateEntityLockThread(TransactionSubsystem txnProvider) {
- Thread t;
- int childCount = rand.nextInt(MAX_NUM_OF_THREAD_IN_A_JOB);
- if (MAX_NUM_OF_THREAD_IN_A_JOB != 0 && childCount == 0) {
- childCount = 1;
- }
- TransactionContext txnContext = generateTxnContext(txnProvider);
-
- for (int i = 0; i < childCount; i++) {
- System.out.println("Creating " + txnContext.getJobId() + "," + i + "th EntityLockThread..");
- t = new Thread(new LockRequestProducer(txnProvider.getLockManager(), txnContext, false, false, false));
- t.start();
- }
- }
-
- private static void generateDatasetLockThread(TransactionSubsystem txnProvider) {
- Thread t;
- // int childCount = rand.nextInt(MAX_NUM_OF_THREAD_IN_A_JOB);
- // if (MAX_NUM_OF_THREAD_IN_A_JOB != 0 && childCount == 0) {
- // childCount = 1;
- // }
- int childCount = 1;
-
- TransactionContext txnContext = generateTxnContext(txnProvider);
-
- for (int i = 0; i < childCount; i++) {
- System.out.println("Creating " + txnContext.getJobId() + "," + i + "th DatasetLockThread..");
- t = new Thread(new LockRequestProducer(txnProvider.getLockManager(), txnContext, true, false, false));
- t.start();
- }
- }
-
- private static void generateEntityLockUpgradeThread(TransactionSubsystem txnProvider) {
- int i;
- Thread t;
- int childCount = MAX_NUM_OF_THREAD_IN_A_JOB;
- if (MAX_NUM_OF_THREAD_IN_A_JOB != 0 && childCount == 0) {
- childCount = 1;
- }
- TransactionContext txnContext = generateTxnContext(txnProvider);
-
- for (i = 0; i < childCount - 1; i++) {
- System.out.println("Creating " + txnContext.getJobId() + "," + i + "th EntityLockUpgradeThread(false)..");
- t = new Thread(new LockRequestProducer(txnProvider.getLockManager(), txnContext, false, true, false));
- t.start();
- }
- System.out.println("Creating " + txnContext.getJobId() + "," + i + "th EntityLockUpgradeThread(true)..");
- t = new Thread(new LockRequestProducer(txnProvider.getLockManager(), txnContext, false, true, true));
- t.start();
- }
-
- private static TransactionContext generateTxnContext(TransactionSubsystem txnProvider) {
- try {
- return new TransactionContext(new JobId(jobId++), txnProvider);
- } catch (ACIDException e) {
- e.printStackTrace();
- return null;
- }
- }
-
-}
-
-class LockRequestProducer implements Runnable {
-
- private static final int MAX_DATASET_NUM = 10;//10
- private static final int MAX_ENTITY_NUM = 30;//30
- private static final int MAX_LOCK_MODE_NUM = 2;
- private static final long DATASET_LOCK_THREAD_SLEEP_TIME = 1000;
- private static final int MAX_LOCK_REQUEST_TYPE_NUM = 4;
-
- private ILockManager lockMgr;
- private TransactionContext txnContext;
- private Random rand;
- private boolean isDatasetLock; //dataset or entity
- private ArrayList<LockRequest> requestQueue;
- private StringBuilder requestHistory;
- private int unlockIndex;
- private int upgradeIndex;
- private boolean isUpgradeThread;
- private boolean isUpgradeThreadJob;
- private boolean isDone;
-
- public LockRequestProducer(ILockManager lockMgr, TransactionContext txnContext, boolean isDatasetLock,
- boolean isUpgradeThreadJob, boolean isUpgradeThread) {
- this.lockMgr = lockMgr;
- this.txnContext = txnContext;
- this.isDatasetLock = isDatasetLock;
- this.isUpgradeThreadJob = isUpgradeThreadJob;
- this.isUpgradeThread = isUpgradeThread;
-
- this.rand = new Random(System.currentTimeMillis());
- requestQueue = new ArrayList<LockRequest>();
- requestHistory = new StringBuilder();
- unlockIndex = 0;
- upgradeIndex = 0;
- isDone = false;
- }
-
- @Override
- public void run() {
- try {
- if (isDatasetLock) {
- System.out.println("DatasetLockThread(" + Thread.currentThread().getName() + ") is running...");
- runDatasetLockTask();
- } else {
- System.out.println("EntityLockThread(" + Thread.currentThread().getName() + "," + isUpgradeThreadJob
- + "," + isUpgradeThread + ") is running...");
- runEntityLockTask();
- }
- } catch (Exception e) {
- e.printStackTrace();
- return;
- } finally {
-
- /*
- System.out.println("" + Thread.currentThread().getName() + "\n" + requestHistory.toString() + ""
- + Thread.currentThread().getName() + "\n");
- System.out.println("RequestHistoryPerJobId\n" + ((LockManager) lockMgr).getLocalRequestHistory());
- System.out.println("");
- System.out.println("GlobalRequestHistory\n" + ((LockManager) lockMgr).getGlobalRequestHistory());
- System.out.println("");
- */
- }
- }
-
- private void runDatasetLockTask() {
- try {
- produceDatasetLockRequest();
- if (isDone) {
- return;
- }
- } catch (ACIDException e) {
- e.printStackTrace();
- return;
- }
-
- try {
- Thread.sleep(DATASET_LOCK_THREAD_SLEEP_TIME);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- try {
- produceDatasetUnlockRequest();
- if (isDone) {
- return;
- }
- } catch (ACIDException e) {
- e.printStackTrace();
- return;
- }
- }
-
- private void runEntityLockTask() {
- int i;
- byte lockMode;
- int lockCount;
- int upgradeCount;
- int releaseCount;
- boolean mayRelease = false;
-
- lockCount = 1 + rand.nextInt(20);
- if (isUpgradeThreadJob) {
- if (isUpgradeThread) {
- upgradeCount = 1; //rand.nextInt(4) + 1;
- if (upgradeCount > lockCount) {
- upgradeCount = lockCount;
- }
- } else {
- upgradeCount = 0;
- }
- lockMode = LockMode.S;
- } else {
- upgradeCount = 0;
- lockMode = (byte) (this.txnContext.getJobId().getId() % 2);
- }
- releaseCount = rand.nextInt(5) % 3 == 0 ? 1 : 0;
-
- //lock
- for (i = 0; i < lockCount; i++) {
- try {
- produceEntityLockRequest(lockMode);
- if (isDone) {
- return;
- }
- } catch (ACIDException e) {
- e.printStackTrace();
- return;
- }
- }
-
- //upgrade
- for (i = 0; i < upgradeCount; i++) {
- try {
- produceEntityLockUpgradeRequest();
- if (isDone) {
- return;
- }
- } catch (ACIDException e) {
- e.printStackTrace();
- return;
- }
- }
-
- //unlock or releaseLocks
- if (releaseCount == 0) {
- //unlock
- for (i = 0; i < lockCount; i++) {
- try {
- produceEntityUnlockRequest();
- if (isDone) {
- return;
- }
- } catch (ACIDException e) {
- e.printStackTrace();
- return;
- }
- }
- } else {
- try {
- synchronized (txnContext) {
- if (txnContext.getTxnState() != ITransactionManager.ABORTED) {
- txnContext.setTxnState(ITransactionManager.ABORTED);
- mayRelease = true;
- }
- }
- if (mayRelease) {
- produceEntityReleaseLocksRequest();
- }
- } catch (ACIDException e) {
- e.printStackTrace();
- return;
- }
- }
- }
-
- private void produceDatasetLockRequest() throws ACIDException {
- int requestType = RequestType.LOCK;
- int datasetId = rand.nextInt(MAX_DATASET_NUM);
- int entityHashValue = -1;
- byte lockMode = (byte) (rand.nextInt(MAX_LOCK_MODE_NUM));
- LockRequest request = new LockRequest(Thread.currentThread().getName(), requestType, new DatasetId(datasetId),
- entityHashValue, lockMode, txnContext);
- requestQueue.add(request);
- requestHistory.append(request.prettyPrint());
- sendRequest(request);
- }
-
- private void produceDatasetUnlockRequest() throws ACIDException {
- LockRequest lockRequest = requestQueue.get(0);
-
- int requestType = RequestType.RELEASE_LOCKS;
- int datasetId = lockRequest.datasetIdObj.getId();
- int entityHashValue = -1;
- byte lockMode = LockMode.S;//lockMode is not used for unlock() call.
- LockRequest request = new LockRequest(Thread.currentThread().getName(), requestType, new DatasetId(datasetId),
- entityHashValue, lockMode, txnContext);
- requestQueue.add(request);
- requestHistory.append(request.prettyPrint());
- sendRequest(request);
- }
-
- private void produceEntityLockRequest(byte lockMode) throws ACIDException {
- int requestType = rand.nextInt(MAX_LOCK_REQUEST_TYPE_NUM);
- int datasetId = rand.nextInt(MAX_DATASET_NUM);
- int entityHashValue = rand.nextInt(MAX_ENTITY_NUM);
- LockRequest request = new LockRequest(Thread.currentThread().getName(), requestType, new DatasetId(datasetId),
- entityHashValue, lockMode, txnContext);
- requestQueue.add(request);
- requestHistory.append(request.prettyPrint());
- sendRequest(request);
- }
-
- private void produceEntityLockUpgradeRequest() throws ACIDException {
- LockRequest lockRequest = null;
- int size = requestQueue.size();
- boolean existLockRequest = false;
-
- while (upgradeIndex < size) {
- lockRequest = requestQueue.get(upgradeIndex++);
- if (lockRequest.isUpgrade || lockRequest.isTryLockFailed) {
- continue;
- }
- if (lockRequest.requestType == RequestType.UNLOCK || lockRequest.requestType == RequestType.RELEASE_LOCKS
- || lockRequest.requestType == RequestType.INSTANT_LOCK
- || lockRequest.requestType == RequestType.INSTANT_TRY_LOCK) {
- continue;
- }
- if (lockRequest.lockMode == LockMode.X) {
- continue;
- }
- existLockRequest = true;
- break;
- }
-
- if (existLockRequest) {
- int requestType = lockRequest.requestType;
- int datasetId = lockRequest.datasetIdObj.getId();
- int entityHashValue = lockRequest.entityHashValue;
- byte lockMode = LockMode.X;
- LockRequest request = new LockRequest(Thread.currentThread().getName(), requestType,
- new DatasetId(datasetId), entityHashValue, lockMode, txnContext);
- request.isUpgrade = true;
- requestQueue.add(request);
- requestHistory.append(request.prettyPrint());
- sendRequest(request);
- }
- }
-
- private void produceEntityUnlockRequest() throws ACIDException {
- LockRequest lockRequest = null;
- int size = requestQueue.size();
- boolean existLockRequest = false;
-
- while (unlockIndex < size) {
- lockRequest = requestQueue.get(unlockIndex++);
- if (lockRequest.isUpgrade || lockRequest.isTryLockFailed) {
- continue;
- }
- if (lockRequest.requestType == RequestType.UNLOCK || lockRequest.requestType == RequestType.RELEASE_LOCKS
- || lockRequest.requestType == RequestType.INSTANT_LOCK
- || lockRequest.requestType == RequestType.INSTANT_TRY_LOCK) {
- continue;
- }
- existLockRequest = true;
- break;
- }
-
- if (existLockRequest) {
- int requestType = RequestType.UNLOCK;
- int datasetId = lockRequest.datasetIdObj.getId();
- int entityHashValue = lockRequest.entityHashValue;
- byte lockMode = lockRequest.lockMode;
- LockRequest request = new LockRequest(Thread.currentThread().getName(), requestType,
- new DatasetId(datasetId), entityHashValue, lockMode, txnContext);
- requestQueue.add(request);
- requestHistory.append(request.prettyPrint());
- sendRequest(request);
- }
- }
-
- private void produceEntityReleaseLocksRequest() throws ACIDException {
- LockRequest lockRequest = null;
- int size = requestQueue.size();
- boolean existLockRequest = false;
-
- while (unlockIndex < size) {
- lockRequest = requestQueue.get(unlockIndex++);
- if (lockRequest.isUpgrade || lockRequest.isTryLockFailed) {
- continue;
- }
- if (lockRequest.requestType == RequestType.UNLOCK || lockRequest.requestType == RequestType.RELEASE_LOCKS
- || lockRequest.requestType == RequestType.INSTANT_LOCK
- || lockRequest.requestType == RequestType.INSTANT_TRY_LOCK) {
- continue;
- }
- existLockRequest = true;
- break;
- }
-
- if (existLockRequest) {
- int requestType = RequestType.RELEASE_LOCKS;
- int datasetId = lockRequest.datasetIdObj.getId();
- int entityHashValue = lockRequest.entityHashValue;
- byte lockMode = lockRequest.lockMode;
- LockRequest request = new LockRequest(Thread.currentThread().getName(), requestType,
- new DatasetId(datasetId), entityHashValue, lockMode, txnContext);
- requestQueue.add(request);
- requestHistory.append(request.prettyPrint());
- sendRequest(request);
- }
- }
-
- private void sendRequest(LockRequest request) throws ACIDException {
-
- switch (request.requestType) {
- case RequestType.LOCK:
- try {
- lockMgr.lock(request.datasetIdObj, request.entityHashValue, request.lockMode, request.txnContext);
- } catch (ACIDException e) {
- if (request.txnContext.isTimeout()) {
- if (request.txnContext.getTxnState() != ITransactionManager.ABORTED) {
- request.txnContext.setTxnState(ITransactionManager.ABORTED);
- log("*** " + request.txnContext.getJobId() + " lock request causing deadlock ***");
- log("Abort --> Releasing all locks acquired by " + request.txnContext.getJobId());
- try {
- lockMgr.releaseLocks(request.txnContext);
- } catch (ACIDException e1) {
- e1.printStackTrace();
- }
- log("Abort --> Released all locks acquired by " + request.txnContext.getJobId());
- }
- isDone = true;
- } else {
- throw e;
- }
- }
- break;
- case RequestType.INSTANT_LOCK:
- try {
- lockMgr.instantLock(request.datasetIdObj, request.entityHashValue, request.lockMode,
- request.txnContext);
- } catch (ACIDException e) {
- if (request.txnContext.isTimeout()) {
- if (request.txnContext.getTxnState() != ITransactionManager.ABORTED) {
- request.txnContext.setTxnState(ITransactionManager.ABORTED);
- log("*** " + request.txnContext.getJobId() + " lock request causing deadlock ***");
- log("Abort --> Releasing all locks acquired by " + request.txnContext.getJobId());
- try {
- lockMgr.releaseLocks(request.txnContext);
- } catch (ACIDException e1) {
- e1.printStackTrace();
- }
- log("Abort --> Released all locks acquired by " + request.txnContext.getJobId());
- }
- isDone = true;
- } else {
- throw e;
- }
- }
- break;
- case RequestType.TRY_LOCK:
- request.isTryLockFailed = !lockMgr.tryLock(request.datasetIdObj, request.entityHashValue,
- request.lockMode, request.txnContext);
- break;
- case RequestType.INSTANT_TRY_LOCK:
- lockMgr.instantTryLock(request.datasetIdObj, request.entityHashValue, request.lockMode,
- request.txnContext);
- break;
- case RequestType.UNLOCK:
- lockMgr.unlock(request.datasetIdObj, request.entityHashValue, request.lockMode, request.txnContext);
- break;
- case RequestType.RELEASE_LOCKS:
- lockMgr.releaseLocks(request.txnContext);
- break;
- default:
- throw new UnsupportedOperationException("Unsupported lock method");
- }
- try {
- Thread.sleep(0);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- private void log(String s) {
- System.out.println(s);
- }
-}
-
-class LockRequest {
- public int requestType;
- public DatasetId datasetIdObj;
- public int entityHashValue;
- public byte lockMode;
- public ITransactionContext txnContext;
- public boolean isUpgrade;
- public boolean isTryLockFailed;
- public long requestTime;
- public String threadName;
-
- public LockRequest(String threadName, int requestType, DatasetId datasetIdObj, int entityHashValue, byte lockMode,
- ITransactionContext txnContext) {
- this.requestType = requestType;
- this.datasetIdObj = datasetIdObj;
- this.entityHashValue = entityHashValue;
- this.lockMode = lockMode;
- this.txnContext = txnContext;
- this.requestTime = System.currentTimeMillis();
- this.threadName = new String(threadName);
- isUpgrade = false;
- isTryLockFailed = false;//used for TryLock request not to call Unlock when the tryLock failed.
- }
-
- public LockRequest(String threadName, int requestType) {
- this.requestType = requestType;
- this.requestTime = System.currentTimeMillis();
- this.threadName = new String(threadName);
- }
-
- //used for "W" request type
- public LockRequest(String threadName, int requestType, int waitTime) {
- this.requestType = requestType;
- this.requestTime = System.currentTimeMillis();
- this.threadName = new String(threadName);
- this.entityHashValue = waitTime;
- }
-
- @Override
- public String toString() {
- return prettyPrint();
- }
-
- public String prettyPrint() {
- StringBuilder s = new StringBuilder();
- //s.append(threadName.charAt(7)).append("\t").append("\t");
- s.append("T").append(threadName.substring(7)).append("\t");
- switch (requestType) {
- case RequestType.LOCK:
- s.append("L");
- break;
- case RequestType.TRY_LOCK:
- s.append("TL");
- break;
- case RequestType.INSTANT_LOCK:
- s.append("IL");
- break;
- case RequestType.INSTANT_TRY_LOCK:
- s.append("ITL");
- break;
- case RequestType.UNLOCK:
- s.append("UL");
- break;
- case RequestType.RELEASE_LOCKS:
- s.append("RL");
- break;
- case RequestType.CHECK_SEQUENCE:
- s.append("CSQ");
- return s.toString();
- case RequestType.CHECK_SET:
- s.append("CST");
- return s.toString();
- case RequestType.END:
- s.append("END");
- return s.toString();
- case RequestType.WAIT:
- s.append("W").append("\t").append(entityHashValue);
- return s.toString();
- default:
- throw new UnsupportedOperationException("Unsupported method");
- }
- s.append("\tJ").append(txnContext.getJobId().getId()).append("\tD").append(datasetIdObj.getId()).append("\tE")
- .append(entityHashValue).append("\t");
- s.append(LockMode.toString(lockMode)).append("\n");
- return s.toString();
- }
-}
-
-class RequestType {
- public static final int LOCK = 0;
- public static final int TRY_LOCK = 1;
- public static final int INSTANT_LOCK = 2;
- public static final int INSTANT_TRY_LOCK = 3;
- public static final int UNLOCK = 4;
- public static final int RELEASE_LOCKS = 5;
- public static final int CHECK_SEQUENCE = 6;
- public static final int CHECK_SET = 7;
- public static final int END = 8;
- public static final int WAIT = 9;
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockRequestTracker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockRequestTracker.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockRequestTracker.java
deleted file mode 100644
index 6fb0675..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockRequestTracker.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.locking;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-public class LockRequestTracker {
- HashMap<Integer, StringBuilder> historyPerJob; //per job
- StringBuilder historyForAllJobs;
- StringBuilder requestHistoryForAllJobs; //request only
-
- public LockRequestTracker() {
- historyForAllJobs = new StringBuilder();
- historyPerJob = new HashMap<Integer, StringBuilder>();
- requestHistoryForAllJobs = new StringBuilder();
- }
-
- public void addEvent(String msg, LockRequest request) {
- int jobId = request.txnContext.getJobId().getId();
- StringBuilder jobHistory = historyPerJob.get(jobId);
-
- //update jobHistory
- if (jobHistory == null) {
- jobHistory = new StringBuilder();
- }
- jobHistory.append(request.prettyPrint()).append("--> ").append(msg).append("\n");
- historyPerJob.put(jobId, jobHistory);
-
- //handle global request queue
- historyForAllJobs.append(request.prettyPrint()).append("--> ").append(msg).append("\n");
- }
-
- public void addRequest(LockRequest request) {
- requestHistoryForAllJobs.append(request.prettyPrint());
- }
-
- public String getHistoryForAllJobs() {
- return historyForAllJobs.toString();
- }
-
- public String getHistoryPerJob() {
- StringBuilder history = new StringBuilder();
- Set<Entry<Integer, StringBuilder>> s = historyPerJob.entrySet();
- Iterator<Entry<Integer, StringBuilder>> iter = s.iterator();
- while (iter.hasNext()) {
- Map.Entry<Integer, StringBuilder> entry = (Map.Entry<Integer, StringBuilder>) iter.next();
- history.append(entry.getValue().toString());
- }
- return history.toString();
- }
-
- public String getRequestHistoryForAllJobs() {
- return requestHistoryForAllJobs.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockWaiter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockWaiter.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockWaiter.java
deleted file mode 100644
index 82f0877..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockWaiter.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.transaction.management.service.locking;
-
-/**
- * LockWaiter object is used for keeping a lock waiter or a lock upgrader information on a certain resource.
- * The resource can be a dataset or an entity.
- *
- * @author kisskys
- */
-public class LockWaiter {
- /**
- * entityInfoSlotNum:
- * If this LockWaiter object is used, this variable is used to indicate the corresponding EntityInfoSlotNum.
- * Otherwise, the variable is used for nextFreeSlot Which indicates the next free waiter object.
- */
- private int entityInfoSlotNum;
- private boolean wait;
- private boolean victim;
- private byte waiterCount;
- private boolean firstGetUp;
- private int nextWaiterObjId; //used for DatasetLockInfo and EntityLockInfo
- private int nextWaitingResourceObjId; //used for JobInfo
- private long beginWaitTime;
- private boolean isWaiter; //is upgrader or waiter
- private boolean isWaitingOnEntityLock; //is waiting on datasetLock or entityLock
-
- public LockWaiter() {
- this.victim = false;
- this.wait = true;
- waiterCount = 0;
- nextWaiterObjId = -1;
- nextWaitingResourceObjId = -1;
- }
-
- public void setEntityInfoSlot(int slotNum) {
- this.entityInfoSlotNum = slotNum;
- }
-
- public int getEntityInfoSlot() {
- return this.entityInfoSlotNum;
- }
-
- public void setNextFreeSlot(int slotNum) {
- this.entityInfoSlotNum = slotNum;
- }
-
- public int getNextFreeSlot() {
- return this.entityInfoSlotNum;
- }
-
- public void setWait(boolean wait) {
- this.wait = wait;
- }
-
- public boolean needWait() {
- return this.wait;
- }
-
- public void setVictim(boolean victim) {
- this.victim = victim;
- }
-
- public boolean isVictim() {
- return this.victim;
- }
-
- public void increaseWaiterCount() {
- waiterCount++;
- }
-
- public void decreaseWaiterCount() {
- waiterCount--;
- }
-
- public byte getWaiterCount() {
- return waiterCount;
- }
-
- public void setWaiterCount(byte count) {
- waiterCount = count;
- }
-
- public void setFirstGetUp(boolean isFirst) {
- firstGetUp = isFirst;
- }
-
- public boolean isFirstGetUp() {
- return firstGetUp;
- }
-
- public void setNextWaiterObjId(int next) {
- nextWaiterObjId = next;
- }
-
- public int getNextWaiterObjId() {
- return nextWaiterObjId;
- }
-
- public void setNextWaitingResourceObjId(int next) {
- nextWaitingResourceObjId = next;
- }
-
- public int getNextWaitingResourceObjId() {
- return nextWaitingResourceObjId;
- }
-
- public void setBeginWaitTime(long time) {
- this.beginWaitTime = time;
- }
-
- public long getBeginWaitTime() {
- return beginWaitTime;
- }
-
- public boolean isWaiter() {
- return isWaiter;
- }
-
- public void setWaiter(boolean isWaiter) {
- this.isWaiter = isWaiter;
- }
-
- public boolean isWaitingOnEntityLock() {
- return isWaitingOnEntityLock;
- }
-
- public void setWaitingOnEntityLock(boolean isWaitingOnEntityLock) {
- this.isWaitingOnEntityLock = isWaitingOnEntityLock;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockWaiterManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockWaiterManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockWaiterManager.java
deleted file mode 100644
index ae971fb..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockWaiterManager.java
+++ /dev/null
@@ -1,403 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.transaction.management.service.locking;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-
-/**
- * LockWaiterManager manages LockWaiter objects array.
- * The array grows when the slots are overflowed.
- * Also, the array shrinks according to the following shrink policy
- * : Shrink when the resource under-utilization lasts for a certain threshold time.
- *
- * @author kisskys
- */
-public class LockWaiterManager {
-
- public static final int SHRINK_TIMER_THRESHOLD = 120000; //2min
-
- private ArrayList<ChildLockWaiterArrayManager> pArray; //parent array
- private int allocChild; //used to allocate the next free LockWaiter object.
- private long shrinkTimer;
- private boolean isShrinkTimerOn;
- private int occupiedSlots;
-
- // ////////////////////////////////////////////////
- // // begin of unit test
- // ////////////////////////////////////////////////
- //
- // public static final int SHRINK_TIMER_THRESHOLD = 0; //for unit test
- //
- // /**
- // * @param args
- // */
- // public static void main(String[] args) {
- // final int DataSize = 5000;
- //
- // int i, j;
- // int slots = ChildLockWaiterArrayManager.NUM_OF_SLOTS;
- // int data[] = new int[DataSize];
- // LockWaiterManager lwMgr = new LockWaiterManager();
- //
- // //allocate: 50
- // System.out.println("allocate: 50");
- // for (i = 0; i < 5; i++) {
- // for (j = i * slots; j < i * slots + slots; j++) {
- // data[j] = lwMgr.allocate();
- // }
- //
- // System.out.println(lwMgr.prettyPrint());
- // }
- //
- // //deallocate from the last child to the first child
- // System.out.println("deallocate from the last child to the first child");
- // for (i = 4; i >= 0; i--) {
- // for (j = i * slots + slots - 1; j >= i * slots; j--) {
- // lwMgr.deallocate(data[j]);
- // }
- // System.out.println(lwMgr.prettyPrint());
- // }
- //
- // //allocate: 50
- // System.out.println("allocate: 50");
- // for (i = 0; i < 5; i++) {
- // for (j = i * slots; j < i * slots + slots; j++) {
- // data[j] = lwMgr.allocate();
- // }
- //
- // System.out.println(lwMgr.prettyPrint());
- // }
- //
- // //deallocate from the first child to last child
- // System.out.println("deallocate from the first child to last child");
- // for (i = 0; i < 5; i++) {
- // for (j = i * slots; j < i * slots + slots; j++) {
- // lwMgr.deallocate(data[j]);
- // }
- //
- // System.out.println(lwMgr.prettyPrint());
- // }
- //
- // //allocate: 50
- // System.out.println("allocate: 50");
- // for (i = 0; i < 5; i++) {
- // for (j = i * slots; j < i * slots + slots; j++) {
- // data[j] = lwMgr.allocate();
- // }
- //
- // System.out.println(lwMgr.prettyPrint());
- // }
- //
- // //deallocate from the first child to 4th child
- // System.out.println("deallocate from the first child to 4th child");
- // for (i = 0; i < 4; i++) {
- // for (j = i * slots; j < i * slots + slots; j++) {
- // lwMgr.deallocate(data[j]);
- // }
- //
- // System.out.println(lwMgr.prettyPrint());
- // }
- //
- // //allocate: 40
- // System.out.println("allocate: 40");
- // for (i = 0; i < 4; i++) {
- // for (j = i * slots; j < i * slots + slots; j++) {
- // data[j] = lwMgr.allocate();
- // }
- //
- // System.out.println(lwMgr.prettyPrint());
- // }
- // }
- //
- // ////////////////////////////////////////////////
- // // end of unit test
- // ////////////////////////////////////////////////
-
- public LockWaiterManager() {
- pArray = new ArrayList<ChildLockWaiterArrayManager>();
- pArray.add(new ChildLockWaiterArrayManager());
- allocChild = 0;
- occupiedSlots = 0;
- isShrinkTimerOn = false;
- }
-
- public int allocate() {
- if (pArray.get(allocChild).isFull()) {
- int size = pArray.size();
- boolean bAlloc = false;
- ChildLockWaiterArrayManager child;
-
- //find a deinitialized child and initialize it
- for (int i = 0; i < size; i++) {
- child = pArray.get(i);
- if (child.isDeinitialized()) {
- child.initialize();
- allocChild = i;
- bAlloc = true;
- break;
- }
- }
-
- //allocate new child when there is no deinitialized child
- if (!bAlloc) {
- pArray.add(new ChildLockWaiterArrayManager());
- allocChild = pArray.size() - 1;
- }
- }
- occupiedSlots++;
- return pArray.get(allocChild).allocate() + allocChild * ChildLockWaiterArrayManager.NUM_OF_SLOTS;
- }
-
- void deallocate(int slotNum) {
- pArray.get(slotNum / ChildLockWaiterArrayManager.NUM_OF_SLOTS).deallocate(
- slotNum % ChildLockWaiterArrayManager.NUM_OF_SLOTS);
- occupiedSlots--;
-
- if (needShrink()) {
- shrink();
- }
- }
-
- /**
- * Shrink policy:
- * Shrink when the resource under-utilization lasts for a certain amount of time.
- * TODO Need to figure out which of the policies is better
- * case1.
- * pArray status : O x x x x x O (O is initialized, x is deinitialized)
- * In the above status, 'CURRENT' needShrink() returns 'TRUE'
- * even if there is nothing to shrink or deallocate.
- * It doesn't distinguish the deinitialized children from initialized children
- * by calculating totalNumOfSlots = pArray.size() * ChildLockWaiterArrayManager.NUM_OF_SLOTS.
- * In other words, it doesn't subtract the deinitialized children's slots.
- * case2.
- * pArray status : O O x x x x x
- * However, in the above case, if we subtract the deinitialized children's slots,
- * needShrink() will return false even if we shrink the pArray at this case.
- *
- * @return
- */
- private boolean needShrink() {
- int size = pArray.size();
- int usedSlots = occupiedSlots;
- if (usedSlots == 0) {
- usedSlots = 1;
- }
-
- if (size > 1 && size * ChildLockWaiterArrayManager.NUM_OF_SLOTS / usedSlots >= 3) {
- if (isShrinkTimerOn) {
- if (System.currentTimeMillis() - shrinkTimer >= SHRINK_TIMER_THRESHOLD) {
- isShrinkTimerOn = false;
- return true;
- }
- } else {
- //turn on timer
- isShrinkTimerOn = true;
- shrinkTimer = System.currentTimeMillis();
- }
- } else {
- //turn off timer
- isShrinkTimerOn = false;
- }
-
- return false;
- }
-
- /**
- * Shrink() may
- * deinitialize(:deallocates array of LockWaiter objects in a child) Children(s) or
- * shrink pArray according to the deinitialized children's contiguity status.
- * It doesn't deinitialize or shrink more than half of children at a time.
- */
- private void shrink() {
- int i;
- int removeCount = 0;
- int size = pArray.size();
- int maxDecreaseCount = size / 2;
- ChildLockWaiterArrayManager child;
-
- //The first buffer never be deinitialized.
- for (i = 1; i < size; i++) {
- if (pArray.get(i).isEmpty()) {
- pArray.get(i).deinitialize();
- }
- }
-
- //remove the empty buffers from the end
- for (i = size - 1; i >= 1; i--) {
- child = pArray.get(i);
- if (child.isDeinitialized()) {
- pArray.remove(i);
- if (++removeCount == maxDecreaseCount) {
- break;
- }
- } else {
- break;
- }
- }
-
- //reset allocChild to the first buffer
- allocChild = 0;
-
- isShrinkTimerOn = false;
- }
-
- public String prettyPrint() {
- StringBuilder s = new StringBuilder("\n########### LockWaiterManager Status #############\n");
- int size = pArray.size();
- ChildLockWaiterArrayManager child;
-
- for (int i = 0; i < size; i++) {
- child = pArray.get(i);
- if (child.isDeinitialized()) {
- continue;
- }
- s.append("child[" + i + "]");
- s.append(child.prettyPrint());
- }
- return s.toString();
- }
-
- public void coreDump(OutputStream os) {
- StringBuilder sb = new StringBuilder("\n########### LockWaiterManager Status #############\n");
- int size = pArray.size();
- ChildLockWaiterArrayManager child;
-
- sb.append("Number of Child: " + size + "\n");
- for (int i = 0; i < size; i++) {
- try {
- child = pArray.get(i);
- sb.append("child[" + i + "]");
- sb.append(child.prettyPrint());
-
- os.write(sb.toString().getBytes());
- } catch (IOException e) {
- //ignore IOException
- }
- sb = new StringBuilder();
- }
- }
-
- public int getShrinkTimerThreshold() {
- return SHRINK_TIMER_THRESHOLD;
- }
-
- public LockWaiter getLockWaiter(int slotNum) {
- return pArray.get(slotNum / ChildLockWaiterArrayManager.NUM_OF_SLOTS).getLockWaiter(
- slotNum % ChildLockWaiterArrayManager.NUM_OF_SLOTS);
- }
-}
-
-class ChildLockWaiterArrayManager {
- public static final int NUM_OF_SLOTS = 100; //number of LockWaiter objects in 'childArray'.
- // public static final int NUM_OF_SLOTS = 10; //for unit test
-
- private int freeSlotNum;
- private int occupiedSlots; //-1 represents 'deinitialized' state.
- LockWaiter childArray[];//childArray
-
- public ChildLockWaiterArrayManager() {
- initialize();
- }
-
- public void initialize() {
- this.childArray = new LockWaiter[NUM_OF_SLOTS];
- this.freeSlotNum = 0;
- this.occupiedSlots = 0;
-
- for (int i = 0; i < NUM_OF_SLOTS - 1; i++) {
- childArray[i] = new LockWaiter();
- childArray[i].setNextFreeSlot(i + 1);
- }
- childArray[NUM_OF_SLOTS - 1] = new LockWaiter();
- childArray[NUM_OF_SLOTS - 1].setNextFreeSlot(-1); //-1 represents EOL(end of link)
- }
-
- public LockWaiter getLockWaiter(int slotNum) {
- return childArray[slotNum];
- }
-
- public int allocate() {
- int currentSlot = freeSlotNum;
- freeSlotNum = childArray[currentSlot].getNextFreeSlot();
- childArray[currentSlot].setWait(true);
- childArray[currentSlot].setVictim(false);
- childArray[currentSlot].setWaiterCount((byte) 0);
- childArray[currentSlot].setNextWaiterObjId(-1);
- childArray[currentSlot].setNextWaitingResourceObjId(-1);
- childArray[currentSlot].setBeginWaitTime(-1l);
- occupiedSlots++;
- if (LockManager.IS_DEBUG_MODE) {
- System.out.println(Thread.currentThread().getName() + " Alloc LockWaiterId(" + currentSlot + ")");
- }
- return currentSlot;
- }
-
- public void deallocate(int slotNum) {
- childArray[slotNum].setNextFreeSlot(freeSlotNum);
- freeSlotNum = slotNum;
- occupiedSlots--;
- if (LockManager.IS_DEBUG_MODE) {
- System.out.println(Thread.currentThread().getName() + " Dealloc LockWaiterId(" + slotNum + ")");
- }
- }
-
- public void deinitialize() {
- childArray = null;
- occupiedSlots = -1;
- }
-
- public boolean isDeinitialized() {
- return occupiedSlots == -1;
- }
-
- public boolean isFull() {
- return occupiedSlots == NUM_OF_SLOTS;
- }
-
- public boolean isEmpty() {
- return occupiedSlots == 0;
- }
-
- public int getNumOfOccupiedSlots() {
- return occupiedSlots;
- }
-
- public int getFreeSlotNum() {
- return freeSlotNum;
- }
-
- public String prettyPrint() {
- LockWaiter waiter;
- StringBuilder sb = new StringBuilder();
- sb.append("\n\toccupiedSlots:" + getNumOfOccupiedSlots());
- sb.append("\n\tfreeSlotNum:" + getFreeSlotNum() + "\n");
- for (int j = 0; j < ChildLockWaiterArrayManager.NUM_OF_SLOTS; j++) {
- waiter = getLockWaiter(j);
- sb.append(j).append(": ");
- sb.append("\t" + waiter.getEntityInfoSlot());
- sb.append("\t" + waiter.needWait());
- sb.append("\t" + waiter.isVictim());
- sb.append("\n");
- }
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TimeOutDetector.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TimeOutDetector.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TimeOutDetector.java
deleted file mode 100644
index bd20c43..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TimeOutDetector.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.locking;
-
-import java.util.LinkedList;
-import java.util.concurrent.Executor;
-
-import org.apache.asterix.common.exceptions.ACIDException;
-
-/**
- * @author pouria, kisskys
- * Any transaction which has been waiting for a lock for more
- * than the predefined time-out threshold is considered to be deadlocked
- * (this can happen in distributed case for example) An instance of this
- * class triggers scanning (sweeping) lock manager's transactions table
- * periodically and detects such timed-out transactions
- */
-
-public class TimeOutDetector {
-
- LockManager lockMgr;
- Thread trigger;
- LinkedList<LockWaiter> victimList;
- int timeoutThreshold;
- int sweepThreshold;
-
- public TimeOutDetector(LockManager lockMgr, Executor threadExecutor) {
- this.victimList = new LinkedList<LockWaiter>();
- this.lockMgr = lockMgr;
- this.trigger = new Thread(new TimeoutTrigger(this));
- this.timeoutThreshold = lockMgr.getTransactionProperties().getTimeoutWaitThreshold();
- this.sweepThreshold = lockMgr.getTransactionProperties().getTimeoutSweepThreshold();
- trigger.setDaemon(true);
- threadExecutor.execute(trigger);
- }
-
- public void sweep() throws ACIDException {
- victimList.clear();
- // Initiates the time-out sweeping process
- // from the lockManager
- lockMgr.sweepForTimeout();
- notifyVictims();
- }
-
- public void checkAndSetVictim(LockWaiter waiterObj) {
- if (System.currentTimeMillis() - waiterObj.getBeginWaitTime() >= timeoutThreshold) {
- waiterObj.setVictim(true);
- waiterObj.setWait(false);
- victimList.add(waiterObj);
- }
- }
-
- private void notifyVictims() {
- for (LockWaiter waiterObj : victimList) {
- synchronized (waiterObj) {
- waiterObj.notifyAll();
- }
- }
- victimList.clear();
- }
-}
-
-class TimeoutTrigger implements Runnable {
-
- TimeOutDetector owner;
-
- public TimeoutTrigger(TimeOutDetector owner) {
- this.owner = owner;
- }
-
- @Override
- public void run() {
- while (true) {
- try {
- Thread.sleep(owner.sweepThreshold);
- owner.sweep(); // Trigger the timeout detector (the owner) to
- // initiate sweep
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ACIDException e) {
- throw new IllegalStateException(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 502e9c7..c1180a4 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -90,7 +90,7 @@ public class LogBuffer implements ILogBuffer {
@Override
public void append(ILogRecord logRecord, long appendLSN) {
logRecord.writeLogRecord(appendBuffer);
- if (logRecord.getLogType() != LogType.FLUSH) {
+ if (logRecord.getLogType() != LogType.FLUSH && logRecord.getLogType() != LogType.WAIT) {
logRecord.getTxnCtx().setLastLSN(appendLSN);
}
synchronized (this) {
@@ -98,7 +98,8 @@ public class LogBuffer implements ILogBuffer {
if (IS_DEBUG_MODE) {
LOGGER.info("append()| appendOffset: " + appendOffset);
}
- if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
+ if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT
+ || logRecord.getLogType() == LogType.WAIT) {
logRecord.isFlushed(false);
syncCommitQ.offer(logRecord);
}
@@ -114,18 +115,19 @@ public class LogBuffer implements ILogBuffer {
public void appendWithReplication(ILogRecord logRecord, long appendLSN) {
logRecord.writeLogRecord(appendBuffer, appendLSN);
- if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH) {
- if (logRecord.getLogType() != LogType.FLUSH) {
- logRecord.getTxnCtx().setLastLSN(appendLSN);
- }
+ if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH
+ && logRecord.getLogType() != LogType.WAIT) {
+ logRecord.getTxnCtx().setLastLSN(appendLSN);
}
+
synchronized (this) {
appendOffset += logRecord.getLogSize();
if (IS_DEBUG_MODE) {
LOGGER.info("append()| appendOffset: " + appendOffset);
}
if (logRecord.getLogSource() == LogSource.LOCAL) {
- if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
+ if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT
+ || logRecord.getLogType() == LogType.WAIT) {
logRecord.isFlushed(false);
syncCommitQ.offer(logRecord);
}
@@ -265,18 +267,23 @@ public class LogBuffer implements ILogBuffer {
// since this operation consisted of delete and insert, we need to notify the optracker twice
txnCtx.notifyOptracker(false);
}
+ if (TransactionSubsystem.IS_PROFILE_MODE) {
+ txnSubsystem.incrementEntityCommitCount();
+ }
} else if (logRecord.getLogType() == LogType.JOB_COMMIT
|| logRecord.getLogType() == LogType.ABORT) {
reusableJobId.setId(logRecord.getJobId());
txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableJobId, false);
txnCtx.notifyOptracker(true);
- notifyJobTerminator();
+ notifyJobTermination();
} else if (logRecord.getLogType() == LogType.FLUSH) {
- notifyFlushTerminator();
+ notifyFlushTermination();
+ } else if (logRecord.getLogType() == LogType.WAIT) {
+ notifyWaitTermination();
}
} else if (logRecord.getLogSource() == LogSource.REMOTE) {
if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
- notifyReplicationTerminator();
+ notifyReplicationTermination();
}
}
@@ -285,7 +292,15 @@ public class LogBuffer implements ILogBuffer {
}
}
- public void notifyJobTerminator() {
+ public void notifyJobTermination() {
+ notifyToSyncCommitQWaiter();
+ }
+
+ public void notifyWaitTermination() {
+ notifyToSyncCommitQWaiter();
+ }
+
+ public void notifyToSyncCommitQWaiter() {
ILogRecord logRecord = null;
while (logRecord == null) {
try {
@@ -300,7 +315,7 @@ public class LogBuffer implements ILogBuffer {
}
}
- public void notifyFlushTerminator() throws ACIDException {
+ public void notifyFlushTermination() throws ACIDException {
LogRecord logRecord = null;
try {
logRecord = (LogRecord) flushQ.take();
@@ -321,7 +336,7 @@ public class LogBuffer implements ILogBuffer {
}
}
- public void notifyReplicationTerminator() {
+ public void notifyReplicationTermination() {
LogRecord logRecord = null;
try {
logRecord = (LogRecord) remoteJobsQ.take();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index b8ccf04..776577a 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -128,8 +128,8 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
protected void appendToLogTail(ILogRecord logRecord) throws ACIDException {
syncAppendToLogTail(logRecord);
- if ((logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)
- && !logRecord.isFlushed()) {
+ if ((logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT
+ || logRecord.getLogType() == LogType.WAIT) && !logRecord.isFlushed()) {
synchronized (logRecord) {
while (!logRecord.isFlushed()) {
try {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index efd66a8..4112529 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -42,7 +42,7 @@ public class LogManagerWithReplication extends LogManager {
}
//only locally generated logs should be replicated
- logRecord.setReplicated(logRecord.getLogSource() == LogSource.LOCAL);
+ logRecord.setReplicated(logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.WAIT);
//Remote flush logs do not need to be flushed separately since they may not trigger local flush
if (logRecord.getLogType() == LogType.FLUSH && logRecord.getLogSource() == LogSource.LOCAL) {
@@ -62,8 +62,8 @@ public class LogManagerWithReplication extends LogManager {
}
if (logRecord.getLogSource() == LogSource.LOCAL) {
- if ((logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)
- && !logRecord.isFlushed()) {
+ if ((logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT
+ || logRecord.getLogType() == LogType.WAIT) && !logRecord.isFlushed()) {
synchronized (logRecord) {
while (!logRecord.isFlushed()) {
try {
@@ -74,11 +74,13 @@ public class LogManagerWithReplication extends LogManager {
}
//wait for job Commit/Abort ACK from replicas
- while (!replicationManager.hasBeenReplicated(logRecord)) {
- try {
- logRecord.wait();
- } catch (InterruptedException e) {
- //ignore
+ if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
+ while (!replicationManager.hasBeenReplicated(logRecord)) {
+ try {
+ logRecord.wait();
+ } catch (InterruptedException e) {
+ //ignore
+ }
}
}
}