You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/03/02 14:26:17 UTC
[39/50] [abbrv] incubator-ignite git commit: sprint-2 - Added
isWithinTransaction() method to session.
sprint-2 - Added isWithinTransaction() method to session.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/16105ec9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/16105ec9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/16105ec9
Branch: refs/heads/ignite-368
Commit: 16105ec9687732d0b01cfeaee9a5b1c227b0921f
Parents: 6097e7b
Author: Dmitiry Setrakyan <ds...@gridgain.com>
Authored: Sat Feb 28 09:44:30 2015 -0800
Committer: Dmitiry Setrakyan <ds...@gridgain.com>
Committed: Sat Feb 28 09:44:30 2015 -0800
----------------------------------------------------------------------
.../store/jdbc/CacheJdbcPersonStore.java | 110 +++++++------------
.../ignite/cache/store/CacheStoreSession.java | 9 ++
.../processors/cache/GridCacheStoreManager.java | 6 +-
.../junits/cache/TestCacheSession.java | 5 +
.../cache/TestThreadLocalCacheSession.java | 5 +
5 files changed, 62 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16105ec9/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
index d80861d..0473280 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
@@ -22,7 +22,6 @@ import org.apache.ignite.cache.store.*;
import org.apache.ignite.examples.datagrid.store.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.resources.*;
-import org.apache.ignite.transactions.*;
import org.jetbrains.annotations.*;
import javax.cache.*;
@@ -72,8 +71,6 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
/** {@inheritDoc} */
@Override public void txEnd(boolean commit) {
- Transaction tx = transaction();
-
Map<String, Connection> props = ses.properties();
try (Connection conn = props.remove(ATTR_NAME)) {
@@ -84,23 +81,21 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
conn.rollback();
}
- System.out.println(">>> Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']');
+ System.out.println(">>> Transaction ended [commit=" + commit + ']');
}
catch (SQLException e) {
- throw new CacheWriterException("Failed to end transaction [xid=" + tx.xid() + ", commit=" + commit + ']', e);
+ throw new CacheWriterException("Failed to end transaction: " + ses.transaction(), e);
}
}
/** {@inheritDoc} */
@Override public Person load(Long key) {
- Transaction tx = transaction();
-
- System.out.println(">>> Store load [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']');
+ System.out.println(">>> Loading key: " + key);
Connection conn = null;
try {
- conn = connection(tx);
+ conn = connection();
try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) {
st.setString(1, key.toString());
@@ -108,14 +103,14 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
ResultSet rs = st.executeQuery();
if (rs.next())
- return person(rs.getLong(1), rs.getString(2), rs.getString(3));
+ return new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
}
}
catch (SQLException e) {
throw new CacheLoaderException("Failed to load object: " + key, e);
}
finally {
- end(tx, conn);
+ end(conn);
}
return null;
@@ -123,60 +118,57 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
/** {@inheritDoc} */
@Override public void write(Cache.Entry<? extends Long, ? extends Person> entry) {
- Transaction tx = transaction();
-
Long key = entry.getKey();
Person val = entry.getValue();
- System.out.println(">>> Store put [key=" + key + ", val=" + val + ", xid=" + (tx == null ? null : tx.xid()) + ']');
+ System.out.println(">>> Putting [key=" + key + ", val=" + val + ']');
Connection conn = null;
try {
- conn = connection(tx);
+ conn = connection();
- int updated;
+ int updated;
- try (PreparedStatement st = conn.prepareStatement(
- "update PERSONS set firstName=?, lastName=? where id=?")) {
- st.setString(1, val.getFirstName());
- st.setString(2, val.getLastName());
- st.setLong(3, val.getId());
+ // Try update first.
+ try (PreparedStatement st = conn.prepareStatement(
+ "update PERSONS set firstName=?, lastName=? where id=?")) {
+ st.setString(1, val.getFirstName());
+ st.setString(2, val.getLastName());
+ st.setLong(3, val.getId());
- updated = st.executeUpdate();
- }
+ updated = st.executeUpdate();
+ }
- // If update failed, try to insert.
- if (updated == 0) {
- try (PreparedStatement st = conn.prepareStatement(
- "insert into PERSONS (id, firstName, lastName) values(?, ?, ?)")) {
- st.setLong(1, val.getId());
- st.setString(2, val.getFirstName());
- st.setString(3, val.getLastName());
+ // If update failed, try to insert.
+ if (updated == 0) {
+ try (PreparedStatement st = conn.prepareStatement(
+ "insert into PERSONS (id, firstName, lastName) values(?, ?, ?)")) {
+ st.setLong(1, val.getId());
+ st.setString(2, val.getFirstName());
+ st.setString(3, val.getLastName());
- st.executeUpdate();
+ st.executeUpdate();
+ }
}
}
- }
catch (SQLException e) {
throw new CacheLoaderException("Failed to put object [key=" + key + ", val=" + val + ']', e);
}
finally {
- end(tx, conn);
+ end(conn);
}
}
/** {@inheritDoc} */
@Override public void delete(Object key) {
- Transaction tx = transaction();
-
- System.out.println(">>> Store remove [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']');
+ System.out.println(">>> Removing key: " + key);
Connection conn = null;
try {
- conn = connection(tx);
+ conn = connection();
try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
st.setLong(1, (Long)key);
@@ -188,7 +180,7 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
throw new CacheWriterException("Failed to remove object: " + key, e);
}
finally {
- end(tx, conn);
+ end(conn);
}
}
@@ -199,17 +191,13 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
final int entryCnt = (Integer)args[0];
- Connection conn = null;
-
- try {
- conn = connection(null);
-
+ try (Connection conn = connection()) {
try (PreparedStatement st = conn.prepareStatement("select * from PERSONS")) {
try (ResultSet rs = st.executeQuery()) {
int cnt = 0;
while (cnt < entryCnt && rs.next()) {
- Person person = person(rs.getLong(1), rs.getString(2), rs.getString(3));
+ Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
clo.apply(person.getId(), person);
@@ -223,18 +211,16 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
catch (SQLException e) {
throw new CacheLoaderException("Failed to load values from cache store.", e);
}
- finally {
- end(null, conn);
- }
}
/**
- * @param tx Cache transaction.
* @return Connection.
* @throws SQLException In case of error.
*/
- private Connection connection(@Nullable Transaction tx) throws SQLException {
- if (tx != null) {
+ private Connection connection() throws SQLException {
+ // If there is an ongoing transaction,
+ // we must reuse the same connection.
+ if (ses.isWithinTransaction()) {
Map<Object, Object> props = ses.properties();
Connection conn = (Connection)props.get(ATTR_NAME);
@@ -257,11 +243,10 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
/**
* Closes allocated resources depending on transaction status.
*
- * @param tx Active transaction, if any.
* @param conn Allocated connection.
*/
- private void end(@Nullable Transaction tx, @Nullable Connection conn) {
- if (tx == null && conn != null) {
+ private void end(@Nullable Connection conn) {
+ if (!ses.isWithinTransaction() && conn != null) {
// Close connection right away if there is no transaction.
try {
conn.close();
@@ -286,23 +271,4 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
return conn;
}
-
- /**
- * Builds person object out of provided values.
- *
- * @param id ID.
- * @param firstName First name.
- * @param lastName Last name.
- * @return Person.
- */
- private Person person(Long id, String firstName, String lastName) {
- return new Person(id, firstName, lastName);
- }
-
- /**
- * @return Current transaction.
- */
- private Transaction transaction() {
- return ses != null ? ses.transaction() : null;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16105ec9/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java
index a2be4c5..38fe95c 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java
@@ -43,6 +43,15 @@ public interface CacheStoreSession {
public Transaction transaction();
/**
+ * Returns {@code true} if performing store operation within a transaction,
+ * {@code false} otherwise. Analogous to calling {@code transaction() != null}.
+ *
+ * @return {@code True} if performing store operation within a transaction,
+ * {@code false} otherwise.
+ */
+ public boolean isWithinTransaction();
+
+ /**
* Gets current session properties. You can add properties directly to the
* returned map.
*
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16105ec9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java
index fac6ea3..9262a8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java
@@ -36,7 +36,6 @@ import org.jetbrains.annotations.*;
import javax.cache.*;
import javax.cache.integration.*;
-import java.lang.reflect.*;
import java.util.*;
/**
@@ -913,6 +912,11 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> {
}
/** {@inheritDoc} */
+ @Override public boolean isWithinTransaction() {
+ return transaction() != null;
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <K1, V1> Map<K1, V1> properties() {
SessionData ses0 = sesHolder.get();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16105ec9/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java
index cca20fe..0709880 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java
@@ -50,6 +50,11 @@ public class TestCacheSession implements CacheStoreSession {
}
/** {@inheritDoc} */
+ @Override public boolean isWithinTransaction() {
+ return transaction() != null;
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <K, V> Map<K, V> properties() {
if (props == null)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16105ec9/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java
index 6687f1f..2bbcf1b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java
@@ -49,6 +49,11 @@ public class TestThreadLocalCacheSession implements CacheStoreSession {
}
/** {@inheritDoc} */
+ @Override public boolean isWithinTransaction() {
+ return transaction() != null;
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <K, V> Map<K, V> properties() {
TestCacheSession ses = sesHolder.get();