You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by gn...@apache.org on 2012/07/24 11:08:58 UTC
svn commit: r1364948 - in
/aries/trunk/transaction/transaction-manager/src/main/java/org/apache/aries/transaction:
HOWLLog.java TransactionManagerService.java
Author: gnodet
Date: Tue Jul 24 09:08:57 2012
New Revision: 1364948
URL: http://svn.apache.org/viewvc?rev=1364948&view=rev
Log:
[ARIES-881] The transaction manager is very slow in low concurrency
Added:
aries/trunk/transaction/transaction-manager/src/main/java/org/apache/aries/transaction/HOWLLog.java
Modified:
aries/trunk/transaction/transaction-manager/src/main/java/org/apache/aries/transaction/TransactionManagerService.java
Added: aries/trunk/transaction/transaction-manager/src/main/java/org/apache/aries/transaction/HOWLLog.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-manager/src/main/java/org/apache/aries/transaction/HOWLLog.java?rev=1364948&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-manager/src/main/java/org/apache/aries/transaction/HOWLLog.java (added)
+++ aries/trunk/transaction/transaction-manager/src/main/java/org/apache/aries/transaction/HOWLLog.java Tue Jul 24 09:08:57 2012
@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.transaction;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.transaction.xa.Xid;
+
+import org.apache.geronimo.transaction.manager.LogException;
+import org.apache.geronimo.transaction.manager.Recovery;
+import org.apache.geronimo.transaction.manager.TransactionBranchInfo;
+import org.apache.geronimo.transaction.manager.TransactionBranchInfoImpl;
+import org.apache.geronimo.transaction.manager.TransactionLog;
+import org.apache.geronimo.transaction.manager.XidFactory;
+import org.objectweb.howl.log.Configuration;
+import org.objectweb.howl.log.LogClosedException;
+import org.objectweb.howl.log.LogConfigurationException;
+import org.objectweb.howl.log.LogFileOverflowException;
+import org.objectweb.howl.log.LogRecord;
+import org.objectweb.howl.log.LogRecordSizeException;
+import org.objectweb.howl.log.LogRecordType;
+import org.objectweb.howl.log.ReplayListener;
+import org.objectweb.howl.log.xa.XACommittingTx;
+import org.objectweb.howl.log.xa.XALogRecord;
+import org.objectweb.howl.log.xa.XALogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @version $Rev: 912058 $ $Date: 2010-02-20 09:34:14 +0800 (Sat, 20 Feb 2010) $
+ */
+public class HOWLLog implements TransactionLog {
+// static final byte PREPARE = 1;
+ //these are used as debugging aids only
+ private static final byte COMMIT = 2;
+ private static final byte ROLLBACK = 3;
+
+ static final String[] TYPE_NAMES = {null, "PREPARE", "COMMIT", "ROLLBACK"};
+
+ private static final Logger log = LoggerFactory.getLogger(HOWLLog.class);
+
+ private File serverBaseDir;
+ private String logFileDir;
+
+ private final XidFactory xidFactory;
+
+ private final XALogger logger;
+ private final Configuration configuration = new Configuration();
+ private boolean started = false;
+ private HashMap<Xid, Recovery.XidBranchesPair> recovered;
+
+ public HOWLLog(String bufferClassName,
+ int bufferSize,
+ boolean checksumEnabled,
+ boolean adler32Checksum,
+ int flushSleepTimeMilliseconds,
+ String logFileDir,
+ String logFileExt,
+ String logFileName,
+ int maxBlocksPerFile,
+ int maxBuffers,
+ int maxLogFiles,
+ int minBuffers,
+ int threadsWaitingForceThreshold,
+ boolean flushPartialBuffers,
+ XidFactory xidFactory,
+ File serverBaseDir) throws IOException, LogConfigurationException {
+ this.serverBaseDir = serverBaseDir;
+ setBufferClassName(bufferClassName);
+ setBufferSizeKBytes(bufferSize);
+ setChecksumEnabled(checksumEnabled);
+ setAdler32Checksum(adler32Checksum);
+ setFlushSleepTimeMilliseconds(flushSleepTimeMilliseconds);
+ //setLogFileDir(logFileDir);
+ this.logFileDir = logFileDir;
+ setLogFileExt(logFileExt);
+ setLogFileName(logFileName);
+ setMaxBlocksPerFile(maxBlocksPerFile);
+ setMaxBuffers(maxBuffers);
+ setMaxLogFiles(maxLogFiles);
+ setMinBuffers(minBuffers);
+ setThreadsWaitingForceThreshold(threadsWaitingForceThreshold);
+ setFlushPartialBuffers(flushPartialBuffers);
+ this.xidFactory = xidFactory;
+ this.logger = new XALogger(configuration);
+ }
+
+ public String getLogFileDir() {
+ return logFileDir;
+ }
+
+ public void setLogFileDir(String logDirName) {
+ File logDir = new File(logDirName);
+ if (!logDir.isAbsolute()) {
+ logDir = new File(serverBaseDir, logDirName);
+ }
+
+ this.logFileDir = logDirName;
+ if (started) {
+ configuration.setLogFileDir(logDir.getAbsolutePath());
+ }
+ }
+
+ public String getLogFileExt() {
+ return configuration.getLogFileExt();
+ }
+
+ public void setLogFileExt(String logFileExt) {
+ configuration.setLogFileExt(logFileExt);
+ }
+
+ public String getLogFileName() {
+ return configuration.getLogFileName();
+ }
+
+ public void setLogFileName(String logFileName) {
+ configuration.setLogFileName(logFileName);
+ }
+
+ public boolean isChecksumEnabled() {
+ return configuration.isChecksumEnabled();
+ }
+
+ public void setChecksumEnabled(boolean checksumOption) {
+ configuration.setChecksumEnabled(checksumOption);
+ }
+
+ public boolean isAdler32ChecksumEnabled() {
+ return configuration.isAdler32ChecksumEnabled();
+ }
+
+ public void setAdler32Checksum(boolean checksumOption) {
+ configuration.setAdler32Checksum(checksumOption);
+ }
+
+ public int getBufferSizeKBytes() {
+ return configuration.getBufferSize();
+ }
+
+ public void setBufferSizeKBytes(int bufferSize) throws LogConfigurationException {
+ configuration.setBufferSize(bufferSize);
+ }
+
+ public String getBufferClassName() {
+ return configuration.getBufferClassName();
+ }
+
+ public void setBufferClassName(String bufferClassName) {
+ configuration.setBufferClassName(bufferClassName);
+ }
+
+ public int getMaxBuffers() {
+ return configuration.getMaxBuffers();
+ }
+
+ public void setMaxBuffers(int maxBuffers) throws LogConfigurationException {
+ configuration.setMaxBuffers(maxBuffers);
+ }
+
+ public int getMinBuffers() {
+ return configuration.getMinBuffers();
+ }
+
+ public void setMinBuffers(int minBuffers) throws LogConfigurationException {
+ configuration.setMinBuffers(minBuffers);
+ }
+
+ public int getFlushSleepTimeMilliseconds() {
+ return configuration.getFlushSleepTime();
+ }
+
+ public void setFlushSleepTimeMilliseconds(int flushSleepTime) {
+ configuration.setFlushSleepTime(flushSleepTime);
+ }
+
+ public int getThreadsWaitingForceThreshold() {
+ return configuration.getThreadsWaitingForceThreshold();
+ }
+
+ public void setThreadsWaitingForceThreshold(int threadsWaitingForceThreshold) {
+ configuration.setThreadsWaitingForceThreshold(threadsWaitingForceThreshold == -1 ? Integer.MAX_VALUE : threadsWaitingForceThreshold);
+ }
+
+ public int getMaxBlocksPerFile() {
+ return configuration.getMaxBlocksPerFile();
+ }
+
+ public void setMaxBlocksPerFile(int maxBlocksPerFile) {
+ configuration.setMaxBlocksPerFile(maxBlocksPerFile == -1 ? Integer.MAX_VALUE : maxBlocksPerFile);
+ }
+
+ public int getMaxLogFiles() {
+ return configuration.getMaxLogFiles();
+ }
+
+ public void setMaxLogFiles(int maxLogFiles) {
+ configuration.setMaxLogFiles(maxLogFiles);
+ }
+
+ public boolean isFlushPartialBuffers() {
+ return configuration.isFlushPartialBuffers();
+ }
+
+ public void setFlushPartialBuffers(boolean flushPartialBuffers) {
+ configuration.setFlushPartialBuffers(flushPartialBuffers);
+ }
+
+ public void doStart() throws Exception {
+ started = true;
+ setLogFileDir(logFileDir);
+ log.debug("Initiating transaction manager recovery");
+ recovered = new HashMap<Xid, Recovery.XidBranchesPair>();
+
+ logger.open(null);
+
+ ReplayListener replayListener = new GeronimoReplayListener(xidFactory, recovered);
+ logger.replayActiveTx(replayListener);
+
+ log.debug("In doubt transactions recovered from log");
+ }
+
+ public void doStop() throws Exception {
+ started = false;
+ logger.close();
+ recovered = null;
+ }
+
+ public void doFail() {
+ }
+
+ public void begin(Xid xid) throws LogException {
+ }
+
+ public Object prepare(Xid xid, List<? extends TransactionBranchInfo> branches) throws LogException {
+ int branchCount = branches.size();
+ byte[][] data = new byte[3 + 2 * branchCount][];
+ data[0] = intToBytes(xid.getFormatId());
+ data[1] = xid.getGlobalTransactionId();
+ data[2] = xid.getBranchQualifier();
+ int i = 3;
+ for (TransactionBranchInfo transactionBranchInfo : branches) {
+ data[i++] = transactionBranchInfo.getBranchXid().getBranchQualifier();
+ data[i++] = transactionBranchInfo.getResourceName().getBytes();
+ }
+ try {
+ XACommittingTx committingTx = logger.putCommit(data);
+ return committingTx;
+ } catch (LogClosedException e) {
+ throw (IllegalStateException) new IllegalStateException().initCause(e);
+ } catch (LogRecordSizeException e) {
+ throw (IllegalStateException) new IllegalStateException().initCause(e);
+ } catch (LogFileOverflowException e) {
+ throw (IllegalStateException) new IllegalStateException().initCause(e);
+ } catch (InterruptedException e) {
+ throw (IllegalStateException) new IllegalStateException().initCause(e);
+ } catch (IOException e) {
+ throw new LogException(e);
+ }
+ }
+
+ public void commit(Xid xid, Object logMark) throws LogException {
+ //the data is theoretically unnecessary but is included to help with debugging and because HOWL currently requires it.
+ byte[][] data = new byte[4][];
+ data[0] = new byte[]{COMMIT};
+ data[1] = intToBytes(xid.getFormatId());
+ data[2] = xid.getGlobalTransactionId();
+ data[3] = xid.getBranchQualifier();
+ try {
+ logger.putDone(data, (XACommittingTx) logMark);
+// logger.putDone(null, (XACommittingTx) logMark);
+ } catch (LogClosedException e) {
+ throw (IllegalStateException) new IllegalStateException().initCause(e);
+ } catch (LogRecordSizeException e) {
+ throw (IllegalStateException) new IllegalStateException().initCause(e);
+ } catch (LogFileOverflowException e) {
+ throw (IllegalStateException) new IllegalStateException().initCause(e);
+ } catch (InterruptedException e) {
+ throw (IllegalStateException) new IllegalStateException().initCause(e);
+ } catch (IOException e) {
+ throw new LogException(e);
+ }
+ }
+
+ public void rollback(Xid xid, Object logMark) throws LogException {
+ //the data is theoretically unnecessary but is included to help with debugging and because HOWL currently requires it.
+ byte[][] data = new byte[4][];
+ data[0] = new byte[]{ROLLBACK};
+ data[1] = intToBytes(xid.getFormatId());
+ data[2] = xid.getGlobalTransactionId();
+ data[3] = xid.getBranchQualifier();
+ try {
+ logger.putDone(data, (XACommittingTx) logMark);
+// logger.putDone(null, (XACommittingTx) logMark);
+ } catch (LogClosedException e) {
+ throw (IllegalStateException) new IllegalStateException().initCause(e);
+ } catch (LogRecordSizeException e) {
+ throw (IllegalStateException) new IllegalStateException().initCause(e);
+ } catch (LogFileOverflowException e) {
+ throw (IllegalStateException) new IllegalStateException().initCause(e);
+ } catch (InterruptedException e) {
+ throw (IllegalStateException) new IllegalStateException().initCause(e);
+ } catch (IOException e) {
+ throw new LogException(e);
+ }
+ }
+
+ public Collection<Recovery.XidBranchesPair> recover(XidFactory xidFactory) throws LogException {
+ log.debug("Initiating transaction manager recovery");
+ Map<Xid, Recovery.XidBranchesPair> recovered = new HashMap<Xid, Recovery.XidBranchesPair>();
+ ReplayListener replayListener = new GeronimoReplayListener(xidFactory, recovered);
+ logger.replayActiveTx(replayListener);
+ log.debug("In doubt transactions recovered from log");
+ return recovered.values();
+ }
+
+ public String getXMLStats() {
+ return logger.getStats();
+ }
+
+ public int getAverageForceTime() {
+ return 0;//logger.getAverageForceTime();
+ }
+
+ public int getAverageBytesPerForce() {
+ return 0;//logger.getAverageBytesPerForce();
+ }
+
+ private byte[] intToBytes(int formatId) {
+ byte[] buffer = new byte[4];
+ buffer[0] = (byte) (formatId >> 24);
+ buffer[1] = (byte) (formatId >> 16);
+ buffer[2] = (byte) (formatId >> 8);
+ buffer[3] = (byte) (formatId >> 0);
+ return buffer;
+ }
+
+ private int bytesToInt(byte[] buffer) {
+ return ((int) buffer[0]) << 24 + ((int) buffer[1]) << 16 + ((int) buffer[2]) << 8 + ((int) buffer[3]) << 0;
+ }
+
+ private class GeronimoReplayListener implements ReplayListener {
+
+ private final XidFactory xidFactory;
+ private final Map<Xid, Recovery.XidBranchesPair> recoveredTx;
+
+ public GeronimoReplayListener(XidFactory xidFactory, Map<Xid, Recovery.XidBranchesPair> recoveredTx) {
+ this.xidFactory = xidFactory;
+ this.recoveredTx = recoveredTx;
+ }
+
+ public void onRecord(LogRecord plainlr) {
+ XALogRecord lr = (XALogRecord) plainlr;
+ short recordType = lr.type;
+ XACommittingTx tx = lr.getTx();
+ if (recordType == LogRecordType.XACOMMIT) {
+
+ byte[][] data = tx.getRecord();
+
+ assert data[0].length == 4;
+ int formatId = bytesToInt(data[1]);
+ byte[] globalId = data[1];
+ byte[] branchId = data[2];
+ Xid masterXid = xidFactory.recover(formatId, globalId, branchId);
+
+ Recovery.XidBranchesPair xidBranchesPair = new Recovery.XidBranchesPair(masterXid, tx);
+ recoveredTx.put(masterXid, xidBranchesPair);
+ log.debug("recovered prepare record for master xid: " + masterXid);
+ for (int i = 3; i < data.length; i += 2) {
+ byte[] branchBranchId = data[i];
+ String name = new String(data[i + 1]);
+
+ Xid branchXid = xidFactory.recover(formatId, globalId, branchBranchId);
+ TransactionBranchInfoImpl branchInfo = new TransactionBranchInfoImpl(branchXid, name);
+ xidBranchesPair.addBranch(branchInfo);
+ log.debug("recovered branch for resource manager, branchId " + name + ", " + branchXid);
+ }
+ } else {
+ if(recordType != LogRecordType.END_OF_LOG) { // This value crops up every time the server is started
+ log.warn("Received unexpected log record: " + lr +" ("+recordType+")");
+ }
+ }
+ }
+
+ public void onError(org.objectweb.howl.log.LogException exception) {
+ log.error("Error during recovery: ", exception);
+ }
+
+ public LogRecord getLogRecord() {
+ //TODO justify this size estimate
+ return new LogRecord(10 * 2 * Xid.MAXBQUALSIZE);
+ }
+
+ }
+}
Modified: aries/trunk/transaction/transaction-manager/src/main/java/org/apache/aries/transaction/TransactionManagerService.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-manager/src/main/java/org/apache/aries/transaction/TransactionManagerService.java?rev=1364948&r1=1364947&r2=1364948&view=diff
==============================================================================
--- aries/trunk/transaction/transaction-manager/src/main/java/org/apache/aries/transaction/TransactionManagerService.java (original)
+++ aries/trunk/transaction/transaction-manager/src/main/java/org/apache/aries/transaction/TransactionManagerService.java Tue Jul 24 09:08:57 2012
@@ -27,7 +27,6 @@ import javax.transaction.UserTransaction
import javax.transaction.xa.XAException;
import org.apache.aries.util.AriesFrameworkUtil;
-import org.apache.geronimo.transaction.log.HOWLLog;
import org.apache.geronimo.transaction.log.UnrecoverableLog;
import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
import org.apache.geronimo.transaction.manager.RecoverableTransactionManager;
@@ -57,6 +56,7 @@ public class TransactionManagerService {
public static final String HOWL_MIN_BUFFERS = "aries.transaction.howl.minBuffers";
public static final String HOWL_THREADS_WAITING_FORCE_THRESHOLD = "aries.transaction.howl.threadsWaitingForceThreshold";
public static final String HOWL_LOG_FILE_DIR = "aries.transaction.howl.logFileDir";
+ public static final String HOWL_FLUSH_PARTIAL_BUFFERS = "aries.transaction.flushPartialBuffers";
public static final int DEFAULT_TRANSACTION_TIMEOUT = 600; // 600 seconds -> 10 minutes
public static final boolean DEFAULT_RECOVERABLE = false; // not recoverable by default
@@ -87,7 +87,7 @@ public class TransactionManagerService {
// Transaction log
if (getBool(RECOVERABLE, DEFAULT_RECOVERABLE)) {
String bufferClassName = getString(HOWL_BUFFER_CLASS_NAME, "org.objectweb.howl.log.BlockLogBuffer");
- int bufferSizeKBytes = getInt(HOWL_BUFFER_SIZE, 32);
+ int bufferSizeKBytes = getInt(HOWL_BUFFER_SIZE, 4);
if (bufferSizeKBytes < 1 || bufferSizeKBytes > 32) {
throw new ConfigurationException(HOWL_BUFFER_SIZE, NLS.MESSAGES.getMessage("buffer.size.between.one.and.thirtytwo"));
}
@@ -107,6 +107,7 @@ public class TransactionManagerService {
throw new ConfigurationException(HOWL_MAX_BUFFERS, NLS.MESSAGES.getMessage("max.buffers.greaterthan.min.buffers"));
}
int threadsWaitingForceThreshold = getInt(HOWL_THREADS_WAITING_FORCE_THRESHOLD, -1);
+ boolean flushPartialBuffers = getBool(HOWL_FLUSH_PARTIAL_BUFFERS, true);
String logFileDir = getString(HOWL_LOG_FILE_DIR, null);
if (logFileDir == null || logFileDir.length() == 0 || !new File(logFileDir).isAbsolute()) {
throw new ConfigurationException(HOWL_LOG_FILE_DIR, NLS.MESSAGES.getMessage("log.file.dir"));
@@ -125,6 +126,7 @@ public class TransactionManagerService {
maxLogFiles,
minBuffers,
threadsWaitingForceThreshold,
+ flushPartialBuffers,
xidFactory,
null);
((HOWLLog) transactionLog).doStart();