You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ag...@apache.org on 2017/10/27 18:14:52 UTC
[geode] branch feature/GEODE-3781 updated: Added prepared statement
cache Debug logging added that should be cleaned up later
This is an automated email from the ASF dual-hosted git repository.
agingade pushed a commit to branch feature/GEODE-3781
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-3781 by this push:
new fdbfb14 Added prepared statement cache Debug logging added that should be cleaned up later
fdbfb14 is described below
commit fdbfb14bf96515d3d30cb8fc5725e57fff7d0601
Author: Anil <ag...@pivotal.io>
AuthorDate: Fri Oct 27 11:13:50 2017 -0700
Added prepared statement cache
Debug logging added that should be cleaned up later
---
.../apache/geode/connectors/jdbc/ColumnValue.java | 5 +
.../geode/connectors/jdbc/JDBCAsyncWriter.java | 1 +
.../apache/geode/connectors/jdbc/JDBCManager.java | 120 ++++++++++++++-------
3 files changed, 86 insertions(+), 40 deletions(-)
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/ColumnValue.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/ColumnValue.java
index de422d5..fa540c2 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/ColumnValue.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/ColumnValue.java
@@ -36,4 +36,9 @@ public class ColumnValue {
public Object getValue() {
return this.value;
}
+
+ @Override
+ public String toString() {
+ return "ColumnValue [isKey=" + isKey + ", columnName=" + columnName + ", value=" + value + "]";
+ }
}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java
index 804301b..c394d12 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java
@@ -56,6 +56,7 @@ public class JDBCAsyncWriter implements AsyncEventListener {
// In that case need to serialize and deserialize.
try {
PdxInstance value = (PdxInstance) event.getDeserializedValue();
+ logger.info("AsyncEventListener event : " + event);
this.manager.write(event.getRegion(), event.getOperation(), event.getKey(), value);
successfulEvents += 1;
} catch (RuntimeException ex) {
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
index 0ec643b..7c7ecea 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.internal.PdxInstanceImpl;
public class JDBCManager {
@@ -45,8 +46,12 @@ public class JDBCManager {
public void write(Region region, Operation operation, Object key, PdxInstance value) {
String tableName = getTableName(region);
+ int pdxTypeId = 0;
+ if (value != null) {
+ pdxTypeId = ((PdxInstanceImpl) value).getPdxType().getTypeId();
+ }
List<ColumnValue> columnList = getColumnToValueList(tableName, key, value, operation);
- int updateCount = executeWrite(columnList, tableName, operation, false);
+ int updateCount = executeWrite(columnList, tableName, operation, pdxTypeId, false);
if (operation.isDestroy()) {
return;
}
@@ -57,7 +62,7 @@ public class JDBCManager {
} else {
upsertOp = Operation.UPDATE;
}
- updateCount = executeWrite(columnList, tableName, upsertOp, true);
+ updateCount = executeWrite(columnList, tableName, upsertOp, pdxTypeId, true);
}
if (updateCount != 1) {
throw new IllegalStateException("Unexpected updateCount " + updateCount);
@@ -65,23 +70,25 @@ public class JDBCManager {
}
private int executeWrite(List<ColumnValue> columnList, String tableName, Operation operation,
- boolean handleException) {
- PreparedStatement pstmt = getQueryStatement(columnList, tableName, operation);
- try {
- int idx = 0;
- for (ColumnValue cv : columnList) {
- idx++;
- pstmt.setObject(idx, cv.getValue());
- }
- pstmt.execute();
- return pstmt.getUpdateCount();
- } catch (SQLException e) {
- if (handleException || operation.isDestroy()) {
- handleSQLException(e);
+ int pdxTypeId, boolean handleException) {
+ PreparedStatement pstmt = getPreparedStatement(columnList, tableName, operation, pdxTypeId);
+ synchronized (pstmt) {
+ try {
+ int idx = 0;
+ for (ColumnValue cv : columnList) {
+ idx++;
+ pstmt.setObject(idx, cv.getValue());
+ }
+ pstmt.execute();
+ return pstmt.getUpdateCount();
+ } catch (SQLException e) {
+ if (handleException || operation.isDestroy()) {
+ handleSQLException(e);
+ }
+ return 0;
+ } finally {
+ clearStatement(pstmt);
}
- return 0;
- } finally {
- clearStatement(pstmt);
}
}
@@ -179,30 +186,63 @@ public class JDBCManager {
return result;
}
- // private final ConcurrentMap<String, PreparedStatement> preparedStatementCache = new
- // ConcurrentHashMap<>();
+ private final ConcurrentMap<StatementKey, PreparedStatement> preparedStatementCache =
+ new ConcurrentHashMap<>();
- private PreparedStatement getQueryStatement(List<ColumnValue> columnList, String tableName,
- Operation operation) {
- // ConcurrentMap<String, PreparedStatement> cache = getPreparedStatementCache(operation);
- // return cache.computeIfAbsent(query, k -> {
- // String query = getQueryString(tableName, columnList, operation);
- // Connection con = getConnection();
- // try {
- // return con.prepareStatement(k);
- // } catch (SQLException e) {
- // handleSQLException(e);
- // }
- // });
- String query = getQueryString(tableName, columnList, operation);
- System.out.println("query=" + query);
- Connection con = getConnection();
- try {
- return con.prepareStatement(query);
- } catch (SQLException e) {
- handleSQLException(e);
- return null; // this line is never reached
+ private static class StatementKey {
+ private final int pdxTypeId;
+ private final Operation operation;
+ private final String tableName;
+
+ public StatementKey(int pdxTypeId, Operation operation, String tableName) {
+ this.pdxTypeId = pdxTypeId;
+ this.operation = operation;
+ this.tableName = tableName;
}
+
+ @Override
+ public int hashCode() {
+ return operation.hashCode() + pdxTypeId + tableName.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ StatementKey other = (StatementKey) obj;
+ if (!operation.equals(other.operation)) {
+ return false;
+ }
+ if (pdxTypeId != other.pdxTypeId) {
+ return false;
+ }
+ if (!tableName.equals(other.tableName)) {
+ return false;
+ }
+ return true;
+ }
+ }
+
+ private PreparedStatement getPreparedStatement(List<ColumnValue> columnList, String tableName,
+ Operation operation, int pdxTypeId) {
+ System.out.println("getPreparedStatement : " + pdxTypeId + "operation: " + operation
+ + " columns: " + columnList);
+ StatementKey key = new StatementKey(pdxTypeId, operation, tableName);
+ return preparedStatementCache.computeIfAbsent(key, k -> {
+ String query = getQueryString(tableName, columnList, operation);
+ System.out.println("query=" + query);
+ Connection con = getConnection();
+ try {
+ return con.prepareStatement(query);
+ } catch (SQLException e) {
+ handleSQLException(e);
+ return null; // this line is never reached
+ }
+ });
}
private List<ColumnValue> getColumnToValueList(String tableName, Object key, PdxInstance value,
--
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].