You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2017/10/31 23:53:58 UTC

[geode] branch feature/GEODE-3781 updated: added read method for loader

This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch feature/GEODE-3781
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-3781 by this push:
     new 42a9928  added read method for loader
42a9928 is described below

commit 42a99280293f43dfa639fe23fb23194cbec62c26
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Tue Oct 31 16:53:39 2017 -0700

    added read method for loader
---
 .../apache/geode/connectors/jdbc/JDBCManager.java  | 86 ++++++++++++++++++-
 .../jdbc/JDBCAsyncWriterIntegrationTest.java       | 21 +----
 .../geode/connectors/jdbc/JDBCManagerUnitTest.java | 97 ++++++++++++++++++++--
 3 files changed, 174 insertions(+), 30 deletions(-)

diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
index 6d6ca77..3440b38 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
@@ -31,7 +31,9 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.PdxInstanceFactory;
 import org.apache.geode.pdx.internal.PdxInstanceImpl;
 
 public class JDBCManager {
@@ -58,6 +60,61 @@ public class JDBCManager {
     this.config = config;
   }
 
+  public PdxInstance read(Region region, Object key) {
+    String tableName = getTableName(region);
+    List<ColumnValue> columnList = getColumnToValueList(tableName, key, null, Operation.GET);
+    PreparedStatement pstmt = getPreparedStatement(columnList, tableName, Operation.GET, 0);
+    synchronized (pstmt) {
+      try {
+        int idx = 0;
+        for (ColumnValue cv : columnList) {
+          idx++;
+          pstmt.setObject(idx, cv.getValue());
+        }
+        ResultSet rs = pstmt.executeQuery();
+        if (rs.next()) {
+          InternalCache cache = (InternalCache) region.getRegionService();
+          String objectClassName = getObjectClassName(tableName);
+          PdxInstanceFactory factory;
+          if (objectClassName != null) {
+            factory = cache.createPdxInstanceFactory(objectClassName);
+          } else {
+            factory = cache.createPdxInstanceFactory("no class", false);
+          }
+          ResultSetMetaData rsmd = rs.getMetaData();
+          int ColumnsNumber = rsmd.getColumnCount();
+          String keyColumnName = getKeyColumnName(tableName);
+          for (int i = 1; i <= ColumnsNumber; i++) {
+            Object columnValue = rs.getObject(i);
+            String columnName = rsmd.getColumnName(i);
+            String fieldName = mapColumnNameToFieldName(columnName, tableName);
+            if (!isFieldExcluded(fieldName) && (isKeyPartOfValue(region.getName())
+                || !keyColumnName.equalsIgnoreCase(columnName))) {
+              factory.writeField(fieldName, columnValue, Object.class);
+            }
+          }
+          if (rs.next()) {
+            throw new IllegalStateException(
+                "Multiple rows returned for key " + key + " on table " + tableName);
+          }
+          return factory.create();
+        } else {
+          return null;
+        }
+      } catch (SQLException e) {
+        handleSQLException(e);
+        return null; // this is never reached
+      } finally {
+        clearStatementParameters(pstmt);
+      }
+    }
+  }
+
+  private String getObjectClassName(String tableName) {
+    // TODO NYI
+    return null;
+  }
+
   public void write(Region region, Operation operation, Object key, PdxInstance value) {
     String tableName = getTableName(region);
     int pdxTypeId = 0;
@@ -103,12 +160,12 @@ public class JDBCManager {
         }
         return 0;
       } finally {
-        clearStatement(pstmt);
+        clearStatementParameters(pstmt);
       }
     }
   }
 
-  private void clearStatement(PreparedStatement ps) {
+  private void clearStatementParameters(PreparedStatement ps) {
     try {
       ps.clearParameters();
     } catch (SQLException ignore) {
@@ -123,11 +180,22 @@ public class JDBCManager {
       return getUpdateQueryString(tableName, columnList);
     } else if (operation.isDestroy()) {
       return getDestroyQueryString(tableName, columnList);
+    } else if (operation.isGet()) {
+      return getSelectQueryString(tableName, columnList);
     } else {
       throw new IllegalStateException("unsupported operation " + operation);
     }
   }
 
+  private String getSelectQueryString(String tableName, List<ColumnValue> columnList) {
+    assert columnList.size() == 1;
+    ColumnValue keyCV = columnList.get(0);
+    assert keyCV.isKey();
+    StringBuilder query = new StringBuilder(
+        "SELECT * FROM " + tableName + " WHERE " + keyCV.getColumnName() + " = ?");
+    return query.toString();
+  }
+
   private String getDestroyQueryString(String tableName, List<ColumnValue> columnList) {
     assert columnList.size() == 1;
     ColumnValue keyCV = columnList.get(0);
@@ -254,7 +322,7 @@ public class JDBCManager {
     StatementKey key = new StatementKey(pdxTypeId, operation, tableName);
     return getPreparedStatementCache().computeIfAbsent(key, k -> {
       String query = getQueryString(tableName, columnList, operation);
-      System.out.println("query=" + query);
+      System.out.println("query=" + query); // TODO remove debugging
       Connection con = getConnection(null, null);
       try {
         return con.prepareStatement(query);
@@ -269,7 +337,7 @@ public class JDBCManager {
       Operation operation) {
     String keyColumnName = getKeyColumnName(tableName);
     ColumnValue keyCV = new ColumnValue(true, keyColumnName, key);
-    if (operation.isDestroy()) {
+    if (operation.isDestroy() || operation.isGet()) {
       return Collections.singletonList(keyCV);
     }
 
@@ -302,6 +370,16 @@ public class JDBCManager {
     return fieldName;
   }
 
+  private String mapColumnNameToFieldName(String columnName, String tableName) {
+    // TODO check config for mapping
+    return columnName.toLowerCase();
+  }
+
+  private boolean isKeyPartOfValue(String regionName) {
+    // TODO check config for mapping
+    return false;
+  }
+
   private String getKeyColumnName(String tableName) {
     return tableToPrimaryKeyMap.computeIfAbsent(tableName, k -> {
       return computeKeyColumnName(k);
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
index 994cbb5..a5efd6c 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
@@ -106,24 +106,9 @@ public class JDBCAsyncWriterIntegrationTest {
 
   @Test
   public void canExecuteSQLOnDataBase() throws Exception {
-    stmt.execute("Insert into " + regionTableName + " values ('1', 'emp1', 10)");
-    stmt.execute("Select * from " + regionTableName);
-    DatabaseMetaData metaData = conn.getMetaData();
-    ResultSet tablesRS = metaData.getTables(null, null, "%", null);
-    String realTableName = null;
-    while (tablesRS.next()) {
-      String name = tablesRS.getString("TABLE_NAME");
-      if (name.equalsIgnoreCase(this.regionTableName)) {
-        if (realTableName != null) {
-          throw new IllegalStateException("Duplicate tables that match region name");
-        }
-        realTableName = name;
-      }
-    }
-    if (realTableName == null) {
-      throw new IllegalStateException("no table was found that matches " + regionTableName);
-    }
-    ResultSet primaryKeys = metaData.getPrimaryKeys(null, null, realTableName);
+    stmt.execute("Create Table blobTable" + " (testfield BLOB)");
+    stmt.execute("Insert into blobTable values ('1', 'emp1', 10)");
+    stmt.execute("Select * from blobTable");
   }
 
   @Test
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCManagerUnitTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCManagerUnitTest.java
index daa9822..4491b04 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCManagerUnitTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCManagerUnitTest.java
@@ -21,6 +21,7 @@ import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.List;
@@ -35,6 +36,8 @@ import org.mockito.ArgumentCaptor;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.Region;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.PdxInstanceFactory;
 import org.apache.geode.pdx.internal.PdxInstanceImpl;
 import org.apache.geode.pdx.internal.PdxType;
 import org.apache.geode.test.fake.Fakes;
@@ -99,12 +102,14 @@ public class JDBCManagerUnitTest {
 
     private ResultSet tableResults;
     private ResultSet primaryKeyResults;
+    private ResultSet queryResultSet;
 
     TestableJDBCManagerWithResultSets(JDBCConfiguration config, ResultSet tableResults,
-        ResultSet primaryKeyResults) {
+        ResultSet primaryKeyResults, ResultSet queryResultSet) {
       super(config);
       this.tableResults = tableResults;
       this.primaryKeyResults = primaryKeyResults;
+      this.queryResultSet = queryResultSet;
     }
 
     @Override
@@ -130,6 +135,19 @@ public class JDBCManagerUnitTest {
       preparedStatement = mock(PreparedStatement.class);
       when(preparedStatement.getUpdateCount()).thenReturn(1);
 
+
+      if (this.queryResultSet == null) {
+        queryResultSet = mock(ResultSet.class);
+        ResultSetMetaData rsmd = mock(ResultSetMetaData.class);
+        when(rsmd.getColumnCount()).thenReturn(3);
+        when(rsmd.getColumnName(anyInt())).thenReturn("ID", "NAME", "AGE");
+        when(queryResultSet.getMetaData()).thenReturn(rsmd);
+        when(queryResultSet.next()).thenReturn(true, false);
+        when(queryResultSet.getObject(anyInt())).thenReturn("1", "Emp1", 21);
+      }
+
+      when(preparedStatement.executeQuery()).thenReturn(queryResultSet);
+
       connection = mock(Connection.class);
       when(connection.getMetaData()).thenReturn(metaData);
       when(connection.prepareStatement(any())).thenReturn(preparedStatement);
@@ -153,7 +171,8 @@ public class JDBCManagerUnitTest {
     when(rs.next()).thenReturn(true, false);
     when(rs.getString("TABLE_NAME")).thenReturn(regionName.toUpperCase());
 
-    this.mgr = new TestableJDBCManagerWithResultSets(createConfiguration(driver, url), rs, rsKeys);
+    this.mgr =
+        new TestableJDBCManagerWithResultSets(createConfiguration(driver, url), rs, rsKeys, null);
   }
 
   private void createDefaultManager() throws SQLException {
@@ -169,9 +188,11 @@ public class JDBCManagerUnitTest {
         upsertReturn);
   }
 
-  private void createManager(ResultSet tableNames, ResultSet primaryKeys) {
-    this.mgr = new TestableJDBCManagerWithResultSets(
-        createConfiguration("java.lang.String", "fakeURL"), tableNames, primaryKeys);
+  private void createManager(ResultSet tableNames, ResultSet primaryKeys,
+      ResultSet queryResultSet) {
+    this.mgr =
+        new TestableJDBCManagerWithResultSets(createConfiguration("java.lang.String", "fakeURL"),
+            tableNames, primaryKeys, queryResultSet);
   }
 
   private JDBCConfiguration createConfiguration(String driver, String url) {
@@ -428,7 +449,7 @@ public class JDBCManagerUnitTest {
     ResultSet tables = mock(ResultSet.class);
     when(tables.next()).thenReturn(true, true, false);
     when(tables.getString("TABLE_NAME")).thenReturn(regionName.toUpperCase());
-    createManager(tables, primaryKeys);
+    createManager(tables, primaryKeys, null);
     catchException(this.mgr).computeKeyColumnName(regionName);
     assertThat((Exception) caughtException()).isInstanceOf(IllegalStateException.class);
     assertThat(caughtException().getMessage()).isEqualTo("Duplicate tables that match region name");
@@ -439,7 +460,7 @@ public class JDBCManagerUnitTest {
     ResultSet tables = null;
     ResultSet primaryKeys = mock(ResultSet.class);
     when(primaryKeys.next()).thenReturn(false);
-    createManager(tables, primaryKeys);
+    createManager(tables, primaryKeys, null);
     catchException(this.mgr).computeKeyColumnName(regionName);
     assertThat((Exception) caughtException()).isInstanceOf(IllegalStateException.class);
     assertThat(caughtException().getMessage())
@@ -451,10 +472,70 @@ public class JDBCManagerUnitTest {
     ResultSet tables = null;
     ResultSet primaryKeys = mock(ResultSet.class);
     when(primaryKeys.next()).thenReturn(true, true, false);
-    createManager(tables, primaryKeys);
+    createManager(tables, primaryKeys, null);
     catchException(this.mgr).computeKeyColumnName(regionName);
     assertThat((Exception) caughtException()).isInstanceOf(IllegalStateException.class);
     assertThat(caughtException().getMessage())
         .isEqualTo("The table " + regionName + " has more than one primary key column.");
   }
+
+  @Test
+  public void verifyReadThatMissesReturnsNull() throws SQLException {
+    ResultSet queryResults = mock(ResultSet.class);
+    when(queryResults.next()).thenReturn(false);
+    createManager(null, null, queryResults);
+    GemFireCacheImpl cache = Fakes.cache();
+    Region region = Fakes.region(regionName, cache);
+    assertThat(this.mgr.read(region, "2")).isNull();
+  }
+
+  @Test
+  public void verifyReadWithMultipleResultsFails() throws SQLException {
+    ResultSet queryResults = mock(ResultSet.class);
+    when(queryResults.next()).thenReturn(true, true, false);
+    ResultSetMetaData rsmd = mock(ResultSetMetaData.class);
+    when(rsmd.getColumnCount()).thenReturn(3);
+    when(rsmd.getColumnName(anyInt())).thenReturn("ID", "NAME", "AGE");
+    when(queryResults.getMetaData()).thenReturn(rsmd);
+    createManager(null, null, queryResults);
+    GemFireCacheImpl cache = Fakes.cache();
+    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
+    PdxInstance pi = mock(PdxInstance.class);
+    when(factory.create()).thenReturn(pi);
+    when(cache.createPdxInstanceFactory("no class", false)).thenReturn(factory);
+    Region region = Fakes.region(regionName, cache);
+    catchException(this.mgr).read(region, "1");
+    assertThat((Exception) caughtException()).isInstanceOf(IllegalStateException.class);
+    assertThat(caughtException().getMessage())
+        .isEqualTo("Multiple rows returned for key 1 on table " + regionName);
+  }
+
+  @Test
+  public void verifyReadThatHitsReturnsValue() throws SQLException {
+    createDefaultManager();
+    GemFireCacheImpl cache = Fakes.cache();
+    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
+    PdxInstance pi = mock(PdxInstance.class);
+    when(factory.create()).thenReturn(pi);
+    when(cache.createPdxInstanceFactory("no class", false)).thenReturn(factory);
+
+    Region region = Fakes.region(regionName, cache);
+    Object key = "1";
+    PdxInstance value = this.mgr.read(region, key);
+    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
+    verify(this.connection).prepareStatement(sqlCaptor.capture());
+    assertThat(sqlCaptor.getValue())
+        .isEqualTo("SELECT * FROM " + regionName + " WHERE " + ID_COLUMN_NAME + " = ?");
+    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
+    verify(this.preparedStatement, times(1)).setObject(anyInt(), objectCaptor.capture());
+    List<Object> allObjects = objectCaptor.getAllValues();
+    assertThat(allObjects.get(0)).isEqualTo("1");
+    assertThat(value).isSameAs(pi);
+    ArgumentCaptor<String> fieldNameCaptor = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<Object> fieldValueCaptor = ArgumentCaptor.forClass(Object.class);
+    verify(factory, times(2)).writeField(fieldNameCaptor.capture(), fieldValueCaptor.capture(),
+        any());
+    assertThat(fieldNameCaptor.getAllValues()).isEqualTo(Arrays.asList("name", "age"));
+    assertThat(fieldValueCaptor.getAllValues()).isEqualTo(Arrays.asList("Emp1", 21));
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].