You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ofbiz.apache.org by David E Jones <jo...@hotwaxmedia.com> on 2008/07/01 07:00:03 UTC
Re: SequenceUtil and ConcurrentHashMap(non-blocking)
I'd have to second the request for a patch... either here or on a Jira
issue. As familiar as I am with that class I don't have it memorized
enough to be able to pick out the differences.
Anyway, yes, looking forward to seeing more.
-David
On Jun 30, 2008, at 10:21 AM, Adam Heath wrote:
> So, one year ago I was tasked with fixing some deadlocks in
> Webslinger(the website container Brainfood has developed). While
> investigating solutions to the problems I was seeing, I stumbled
> across the book, "Java Concurrency in Practice".
>
> OH
> MY
> GOD
> !
>
> It's not normal for me to get all ecstatic about a programming book,
> but this one has to be at least an 11(on a scale of 1-10). Anyone
> doing *any* multi-threaded programming in java absolutely *must*
> read this book.
>
> Anyways, after reading said book, I completely rewrote the internals
> of webslinger, following the guidelines I had read about. It's now
> been a year, and what I figured out seems to be running great; no
> normal code paths thru webslinger take out *any* locks now.
>
> I've decided to start applying this knowledge against OfBiz. My
> first attempt is attached.
>
> While SequenceUtil is not a highly-contended resource, it is simple
> enough, and used often enough, so that others can understand what is
> going on, and it's possible to see if the change actually breaks
> anything.
>
> Summarizing, here is how it functions(at a high-level).
>
> 1: Any fields that have to be modified together have to be moved to
> separate class.
> 2: Said class is made final, and *all* fields are also made final;
> equals() and hashCode() are implemented.
> 3: The parent class is modified to use an AtomicReference.
> 4: Any time the parent class needs to change one of the original
> fields, it makes a copy of the current reference with the new
> values, then does an atomic compareAndSet. This can cause multiple
> allocations when contended, however, java 1.6 is smart enough to do
> some allocations on the stack, so in general this is not a problem.
>
> These changes invariably increase the size of the code; however,
> they do reduce overhead in the long run, so I consider them
> beneficial.
>
> Attached you will find SequenceUtil.java; I've modified it to be non-
> blocking. I've done "ant run-install", with no problems, but have
> *not* yet run any test cases.
>
> I'm sending it here(to this list) first, before checking it in, as
> this is a radical departure from simple programming practices, and
> would like others to discuss if I should do this in other places.
> /
> *******************************************************************************
> * Licensed to the Apache Software Foundation (ASF) under one
> * or more contributor license agreements. See the NOTICE file
> * distributed with this work for additional information
> * regarding copyright ownership. The ASF licenses this file
> * to you under the Apache License, Version 2.0 (the
> * "License"); you may not use this file except in compliance
> * with the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing,
> * software distributed under the License is distributed on an
> * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> * KIND, either express or implied. See the License for the
> * specific language governing permissions and limitations
> * under the License.
> *******************************************************************************/
> package org.ofbiz.entity.util;
>
> import java.sql.Connection;
> import java.sql.ResultSet;
> import java.sql.SQLException;
> import java.sql.Statement;
> import java.util.Hashtable;
> import java.util.Map;
> import java.util.concurrent.ConcurrentHashMap;
> import java.util.concurrent.atomic.AtomicReference;
>
> import javax.transaction.Transaction;
>
> import org.ofbiz.base.util.Debug;
> import org.ofbiz.entity.GenericEntityException;
> import org.ofbiz.entity.jdbc.ConnectionFactory;
> import org.ofbiz.entity.model.ModelEntity;
> import org.ofbiz.entity.model.ModelField;
> import org.ofbiz.entity.transaction.GenericTransactionException;
> import org.ofbiz.entity.transaction.TransactionUtil;
>
> /**
> * Sequence Utility to get unique sequences from named sequence banks
> * Uses a collision detection approach to safely get unique sequenced
> ids in banks from the database
> */
> public class SequenceUtil {
>
> public static final String module = SequenceUtil.class.getName();
>
> ConcurrentHashMap<String, SequenceBank> sequences = new
> ConcurrentHashMap<String, SequenceBank>();
> String helperName;
> ModelEntity seqEntity;
> String tableName;
> String nameColName;
> String idColName;
>
> private SequenceUtil() {}
>
> public SequenceUtil(String helperName, ModelEntity seqEntity,
> String nameFieldName, String idFieldName) {
> this.helperName = helperName;
> this.seqEntity = seqEntity;
> if (seqEntity == null) {
> throw new IllegalArgumentException("The sequence model
> entity was null but is required.");
> }
> this.tableName = seqEntity.getTableName(helperName);
>
> ModelField nameField = seqEntity.getField(nameFieldName);
>
> if (nameField == null) {
> throw new IllegalArgumentException("Could not find the
> field definition for the sequence name field " + nameFieldName);
> }
> this.nameColName = nameField.getColName();
>
> ModelField idField = seqEntity.getField(idFieldName);
>
> if (idField == null) {
> throw new IllegalArgumentException("Could not find the
> field definition for the sequence id field " + idFieldName);
> }
> this.idColName = idField.getColName();
> }
>
> public Long getNextSeqId(String seqName, long staggerMax,
> ModelEntity seqModelEntity) {
> SequenceBank bank = this.getBank(seqName, seqModelEntity);
> return bank.getNextSeqId(staggerMax);
> }
>
> public void forceBankRefresh(String seqName, long staggerMax) {
> // don't use the get method because we don't want to create
> if it fails
> SequenceBank bank = sequences.get(seqName);
> if (bank == null) {
> return;
> }
>
> bank.refresh(staggerMax);
> }
>
> private SequenceBank getBank(String seqName, ModelEntity
> seqModelEntity) {
> SequenceBank bank = sequences.get(seqName);
>
> if (bank == null) {
> bank = new SequenceBank(seqName, seqModelEntity, this);
> SequenceBank oldBank = sequences.putIfAbsent(seqName,
> bank);
> if (oldBank != null) bank = oldBank;
> }
>
> return bank;
> }
>
> class SequenceBank {
> public static final long defaultBankSize = 10;
> public static final long maxBankSize = 5000;
> public static final long startSeqId = 10000;
> public static final int minWaitMillis = 5;
> public static final int maxWaitMillis = 50;
> public static final int maxTries = 5;
>
> final AtomicReference<SequenceValue> ref;
> String seqName;
> SequenceUtil parentUtil;
> ModelEntity seqModelEntity;
>
> public SequenceBank(String seqName, ModelEntity
> seqModelEntity, SequenceUtil parentUtil) {
> this.seqName = seqName;
> this.parentUtil = parentUtil;
> this.seqModelEntity = seqModelEntity;
> ref = new AtomicReference<SequenceValue>(new
> SequenceValue(0, 0).fillBank(1));
> }
>
> public Long getNextSeqId(long staggerMax) {
> long stagger = 1;
> if (staggerMax > 1) {
> stagger = Math.round(Math.random() * staggerMax);
> if (stagger == 0) stagger = 1;
> }
> SequenceValue value;
> do {
> value = ref.get();
> if ((value.curSeqId + stagger) > value.maxSeqId) {
> value = value.fillBank(stagger);
> if ((value.curSeqId + stagger) > value.maxSeqId) {
>
> Debug.logError("[SequenceUtil.SequenceBank.getNextSeqId] Fill bank
> failed, returning null", module);
> return null;
> }
> }
> } while (!ref.compareAndSet(value, new
> SequenceValue(value.curSeqId + stagger, value.maxSeqId)));
> return Long.valueOf(value.curSeqId);
> }
>
> public void refresh(long staggerMax) {
> SequenceValue value;
> do {
> value = ref.get();
> } while (!ref.compareAndSet(value,
> value.refresh(staggerMax)));
> }
>
>
>
> protected final class SequenceValue {
> protected final long curSeqId;
> protected final long maxSeqId;
>
> protected SequenceValue(long curSeqId, long maxSeqId) {
> this.curSeqId = curSeqId;
> this.maxSeqId = maxSeqId;
> }
>
> public int hashCode() {
> long r = curSeqId ^ maxSeqId;
> return (int) ((r >> 32) ^ (r & 0xffff));
> }
>
> public boolean equals(Object o) {
> if (!(o instanceof SequenceValue)) return false;
> SequenceValue other = (SequenceValue) o;
> return curSeqId == other.curSeqId && maxSeqId ==
> other.maxSeqId;
> }
>
> protected SequenceValue refresh(long staggerMax) {
> return fillBank(staggerMax, this.curSeqId);
> }
>
> protected SequenceValue fillBank(long stagger) {
> return fillBank(stagger, this.curSeqId);
> }
>
> private SequenceValue fillBank(long stagger, long curSeqId) {
> //Debug.logWarning("[SequenceUtil.SequenceBank.fillBank]
> Starting fillBank Thread Name is: " +
> Thread.currentThread().getName() + ":" +
> Thread.currentThread().toString(), module);
>
> // no need to get a new bank, SeqIds available
> if ((curSeqId + stagger) <= maxSeqId) return this;
>
> long bankSize = defaultBankSize;
> if (seqModelEntity != null &&
> seqModelEntity.getSequenceBankSize() != null) {
> bankSize =
> seqModelEntity.getSequenceBankSize().longValue();
> }
> if (stagger > 1) {
> // NOTE: could use staggerMax for this, but if that
> is done it would be easier to guess a valid next id without a brute
> force attack
> bankSize = stagger * defaultBankSize;
> }
>
> if (bankSize > maxBankSize) bankSize = maxBankSize;
>
> long val1 = 0;
> long val2 = 0;
>
> // NOTE: the fancy ethernet type stuff is for the case
> where transactions not available, or something funny happens with
> really sensitive timing (between first select and update, for example)
> int numTries = 0;
>
> while (val1 + bankSize != val2) {
> if (Debug.verboseOn())
> Debug.logVerbose("[SequenceUtil.SequenceBank.fillBank] Trying to get
> a bank of sequenced ids for " +
> seqName + "; start of loop val1=" + val1 + ",
> val2=" + val2 + ", bankSize=" + bankSize, module);
>
> // not sure if this synchronized block is totally
> necessary, the method is synchronized but it does do a wait/sleep
> //outside of this block, and this is the really
> sensitive block, so making sure it is isolated; there is some overhead
> //to this, but very bad things can happen if we try
> to do too many of these at once for a single sequencer
> Transaction suspendedTransaction = null;
> try {
> //if we can suspend the transaction, we'll
> try to do this in a local manual transaction
> suspendedTransaction =
> TransactionUtil.suspend();
>
> boolean beganTransaction = false;
> try {
> beganTransaction = TransactionUtil.begin();
>
> Connection connection = null;
> Statement stmt = null;
> ResultSet rs = null;
>
> try {
> connection =
> ConnectionFactory.getConnection(parentUtil.helperName);
> } catch (SQLException sqle) {
>
> Debug.logWarning("[SequenceUtil.SequenceBank.fillBank]: Unable to
> esablish a connection with the database... Error was:" +
> sqle.toString(), module);
> throw sqle;
> } catch (GenericEntityException e) {
>
> Debug.logWarning("[SequenceUtil.SequenceBank.fillBank]: Unable to
> esablish a connection with the database... Error was: " +
> e.toString(), module);
> throw e;
> }
>
> if (connection == null) {
> throw new
> GenericEntityException("[SequenceUtil.SequenceBank.fillBank]: Unable
> to esablish a connection with the database, connection was null...");
> }
>
> String sql = null;
>
> try {
> // we shouldn't need this, and some
> TX managers complain about it, so not including it:
> connection.setAutoCommit(false);
>
> stmt = connection.createStatement();
>
> sql = "SELECT " +
> parentUtil.idColName + " FROM " + parentUtil.tableName + " WHERE " +
> parentUtil.nameColName + "='" + seqName + "'";
> rs = stmt.executeQuery(sql);
> boolean gotVal1 = false;
> if (rs.next()) {
> val1 =
> rs.getLong(parentUtil.idColName);
> gotVal1 = true;
> }
> rs.close();
>
> if (!gotVal1) {
>
> Debug.logWarning("[SequenceUtil.SequenceBank.fillBank] first select
> failed: will try to add new row, result set was empty for sequence
> [" + seqName + "] \nUsed SQL: " + sql + " \n Thread Name is: " +
> Thread.currentThread().getName() + ":" +
> Thread.currentThread().toString(), module);
> sql = "INSERT INTO " +
> parentUtil.tableName + " (" + parentUtil.nameColName + ", " +
> parentUtil.idColName + ") VALUES ('" + seqName + "', " + startSeqId
> + ")";
> if (stmt.executeUpdate(sql) <= 0) {
> throw new
> GenericEntityException("No rows changed when trying insert new
> sequence row with this SQL: " + sql);
> }
> continue;
> }
>
> sql = "UPDATE " +
> parentUtil.tableName + " SET " + parentUtil.idColName + "=" +
> parentUtil.idColName + "+" + bankSize + " WHERE " +
> parentUtil.nameColName + "='" + seqName + "'";
> if (stmt.executeUpdate(sql) <= 0) {
> throw new
> GenericEntityException("[SequenceUtil.SequenceBank.fillBank] update
> failed, no rows changes for seqName: " + seqName);
> }
>
> sql = "SELECT " +
> parentUtil.idColName + " FROM " + parentUtil.tableName + " WHERE " +
> parentUtil.nameColName + "='" + seqName + "'";
> rs = stmt.executeQuery(sql);
> boolean gotVal2 = false;
> if (rs.next()) {
> val2 =
> rs.getLong(parentUtil.idColName);
> gotVal2 = true;
> }
>
> rs.close();
>
> if (!gotVal2) {
> throw new
> GenericEntityException("[SequenceUtil.SequenceBank.fillBank] second
> select failed: aborting, result set was empty for sequence: " +
> seqName);
> }
>
> // got val1 and val2 at this point,
> if we don't have the right difference between them, force a rollback
> (with
> //setRollbackOnly and NOT with an
> exception because we don't want to break from the loop, just err out
> and
> //continue), then flow out to allow
> the wait and loop thing to happen
> if (val1 + bankSize != val2) {
>
> TransactionUtil.setRollbackOnly("Forcing transaction rollback in
> sequence increment because we didn't get a clean update, ie a
> conflict was found, so not saving the results", null);
> }
> } catch (SQLException sqle) {
> Debug.logWarning(sqle,
> "[SequenceUtil.SequenceBank.fillBank] SQL Exception while executing
> the following:\n" + sql + "\nError was:" + sqle.getMessage(), module);
> throw sqle;
> } finally {
> try {
> if (stmt != null) stmt.close();
> } catch (SQLException sqle) {
> Debug.logWarning(sqle, "Error
> closing statement in sequence util", module);
> }
> try {
> if (connection != null)
> connection.close();
> } catch (SQLException sqle) {
> Debug.logWarning(sqle, "Error
> closing connection in sequence util", module);
> }
> }
> } catch (Exception e) {
> String errMsg = "General error in getting
> a sequenced ID";
> Debug.logError(e, errMsg, module);
> try {
>
> TransactionUtil.rollback(beganTransaction, errMsg, e);
> } catch (GenericTransactionException
> gte2) {
> Debug.logError(gte2, "Unable to
> rollback transaction", module);
> }
>
> // error, break out of the loop to not
> try to continue forever
> break;
> } finally {
> try {
>
> TransactionUtil.commit(beganTransaction);
> } catch (GenericTransactionException gte) {
> Debug.logError(gte, "Unable to commit
> sequence increment transaction, continuing anyway though", module);
> }
> }
> } catch (GenericTransactionException e) {
> Debug.logError(e, "System Error suspending
> transaction in sequence util", module);
> } finally {
> if (suspendedTransaction != null) {
> try {
>
> TransactionUtil.resume(suspendedTransaction);
> } catch (GenericTransactionException e) {
> Debug.logError(e, "Error resuming
> suspended transaction in sequence util", module);
> }
> }
> }
>
> if (val1 + bankSize != val2) {
> if (numTries >= maxTries) {
> String errMsg =
> "[SequenceUtil.SequenceBank.fillBank] maxTries (" + maxTries + ")
> reached for seqName [" + seqName + "], giving up.";
> Debug.logError(errMsg, module);
> return this;
> }
>
> // collision happened, wait a bounded random
> amount of time then continue
> int waitTime = (Double.valueOf(Math.random() *
> (maxWaitMillis - minWaitMillis))).intValue() + minWaitMillis;
>
>
> Debug.logWarning("[SequenceUtil.SequenceBank.fillBank] Collision
> found for seqName [" + seqName + "], val1=" + val1 + ", val2=" +
> val2 + ", val1+bankSize=" + (val1 + bankSize) + ", bankSize=" +
> bankSize + ", waitTime=" + waitTime, module);
>
> try {
> // using the Thread.sleep to more reliably
> lock this thread: this.wait(waitTime);
> java.lang.Thread.sleep(waitTime);
> } catch (Exception e) {
> Debug.logWarning(e, "Error waiting in
> sequence util", module);
> return this;
> }
> }
>
> numTries++;
> }
>
> if (Debug.infoOn()) Debug.logInfo("Got bank of sequenced
> IDs for [" + seqName + "]; curSeqId=" + val1 + ", maxSeqId=" + val2
> + ", bankSize=" + bankSize, module);
> return new SequenceValue(val1, val2);
> //Debug.logWarning("[SequenceUtil.SequenceBank.fillBank]
> Ending fillBank Thread Name is: " + Thread.currentThread().getName()
> + ":" + Thread.currentThread().toString(), module);
> }
>
> }
> }
> }