You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ag...@apache.org on 2017/10/23 19:08:04 UTC

[geode] 01/02: Initial project setup for jdbc-connector. Added basic source and test implementation for JDBCAsyncWriter.

This is an automated email from the ASF dual-hosted git repository.

agingade pushed a commit to branch feature/GEODE-3781
in repository https://gitbox.apache.org/repos/asf/geode.git

commit f312f305077fe29887ee8c1e7ccd5a4d511fab15
Author: Anil <ag...@pivotal.io>
AuthorDate: Mon Oct 23 12:01:58 2017 -0700

    Initial project setup for jdbc-connector.
    Added basic source and test implementation for JDBCAsyncWriter.
---
 geode-assembly/build.gradle                        |   1 +
 geode-connectors/build.gradle                      |  32 +++++
 .../geode/connectors/jdbc/JDBCAsyncWriter.java     |  58 +++++++++
 .../apache/geode/connectors/jdbc/JDBCHelper.java   |   5 +
 .../jdbc/JDBCAsyncWriterIntegrationTest.java       | 144 +++++++++++++++++++++
 settings.gradle                                    |   1 +
 6 files changed, 241 insertions(+)

diff --git a/geode-assembly/build.gradle b/geode-assembly/build.gradle
index 48a5cfa..ac0ac7e 100755
--- a/geode-assembly/build.gradle
+++ b/geode-assembly/build.gradle
@@ -58,6 +58,7 @@ dependencies {
   archives project(':geode-common')  
   archives project(':geode-json')  
   archives project(':geode-core')
+  archives project(':geode-connectors')
   archives project(':geode-lucene')
   archives project(':geode-old-client-support')
   archives project(':geode-protobuf')
diff --git a/geode-connectors/build.gradle b/geode-connectors/build.gradle
new file mode 100644
index 0000000..b11f352
--- /dev/null
+++ b/geode-connectors/build.gradle
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+dependencies {
+    compile project(':geode-core')
+    compile project(':geode-common')
+
+    testCompile project(':geode-junit')
+
+    //Connectors test framework.
+    testRuntime 'org.apache.derby:derby:' + project.'derby.version'
+    testCompile 'com.pholser:junit-quickcheck-core:' + project.'junit-quickcheck.version'
+    testCompile 'com.pholser:junit-quickcheck-generators:' + project.'junit-quickcheck.version'
+    testCompile files(project(':geode-core').sourceSets.test.output)
+    testCompile project(':geode-old-versions')
+}
+
+integrationTest.forkEvery 0
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java
new file mode 100644
index 0000000..cc0b30d
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+
+/*
+ * This class provides write behind cache semantics for a JDBC data source using AsyncEventListener.
+ * 
+ * @since Geode 1.4
+ */
+public class JDBCAsyncWriter implements AsyncEventListener {
+
+  private long totalEvents = 0;
+
+  private long successfulEvents = 0;
+
+  @Override
+  public void close() {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public boolean processEvents(List<AsyncEvent> events) {
+    totalEvents += events.size();
+    return true;
+  }
+
+  @Override
+  public void init(Properties props) {
+
+  };
+
+  public long getTotalEvents() {
+    return this.totalEvents;
+  }
+
+  public long getsuccessfulEvents() {
+    return this.successfulEvents;
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCHelper.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCHelper.java
new file mode 100644
index 0000000..edad4c4
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCHelper.java
@@ -0,0 +1,5 @@
+package org.apache.geode.connectors.jdbc;
+
+public class JDBCHelper {
+
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
new file mode 100644
index 0000000..d135dfc
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc;
+
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.assertj.core.api.Assertions.*;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.*;
+import org.junit.experimental.categories.Category;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.awaitility.Awaitility;
+
+@Category(IntegrationTest.class)
+public class JDBCAsyncWriterIntegrationTest {
+
+  private Cache cache;
+  
+  private Connection conn;
+  
+  private Statement stmt;
+
+  JDBCAsyncWriter jdbcWriter;
+
+  private String dbName="DerbyDB";
+  
+  private String regionTableName = "employees";
+  
+  @Before
+  public void setup() throws Exception {
+    try {
+      cache = CacheFactory.getAnyInstance();
+    } catch (Exception e) {
+      // ignore
+    }
+    if (null == cache) {
+      cache = (GemFireCacheImpl) new CacheFactory().set(MCAST_PORT, "0").create();
+    }
+    setupDB();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cache != null && !cache.isClosed()) {
+      cache.close();
+      cache = null;
+    }
+    closeDB();
+  }
+
+  public void setupDB() throws Exception {
+    String driver = "org.apache.derby.jdbc.EmbeddedDriver";
+    String connectionURL = "jdbc:derby:memory:" + dbName + ";create=true";
+    Class.forName(driver);
+    conn = DriverManager.getConnection(connectionURL);
+    stmt = conn.createStatement();
+    stmt.execute("Create Table " + regionTableName + " (id varchar(10), name varchar(10))");
+  }
+
+  public void closeDB() throws Exception {
+    if (stmt == null) {
+      stmt = conn.createStatement();
+    }
+    stmt.execute("Drop table " + regionTableName);
+    stmt.close();
+    
+    if (conn != null) {
+      conn.close();
+    }
+  }
+
+  @Test
+  public void canInstallJDBCAsyncWriterOnRegion() {
+    Region employees = createRegionWithJDBCAsyncWriter("employees");
+    employees.put("1", "Emp1");
+    try {Thread.sleep(100);} catch (Exception ex){}
+    employees.put("2", "Emp2");
+
+    Awaitility.await().atMost(30, TimeUnit.SECONDS)
+    .until(() -> assertThat(jdbcWriter.getTotalEvents()).isEqualTo(2));
+
+  }
+
+  @Test
+  public void jdbcAsyncWriterCanWriteToDatabase() throws Exception {
+    Region employees = createRegionWithJDBCAsyncWriter("employees");
+    
+    employees.put("1", "Emp1");
+    employees.put("2", "Emp2");
+
+    validateTableRowCount(2);
+  }
+  
+  private Region createRegionWithJDBCAsyncWriter(String regionName) {
+    jdbcWriter = new JDBCAsyncWriter();
+    cache.createAsyncEventQueueFactory().setBatchSize(1)
+    .setBatchTimeInterval(1).create("jdbcAsyncQueue", jdbcWriter);
+
+    RegionFactory rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
+    rf.addAsyncEventQueueId("jdbcAsyncQueue");
+    return rf.create(regionName); 
+  }
+  
+  private void validateTableRowCount(int expected) throws Exception {
+    Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> {
+      int size = 0;
+      try {
+        ResultSet rs = stmt.executeQuery("select count(*) from " + regionTableName);
+        while (rs.next()) {
+          size = rs.getInt(1);
+        }
+      } catch (Exception ex) {
+        // Need to fix this.
+        System.out.println("Exception while getting the table row count");
+      }
+      assertThat(size).isEqualTo(expected);
+    });
+  }
+ 
+}
diff --git a/settings.gradle b/settings.gradle
index 87f4a9c..9cbad6d 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -40,6 +40,7 @@ include 'extensions/geode-modules-assembly'
 include 'geode-protobuf'
 include 'extensions/session-testing-war'
 include 'geode-concurrency-test'
+include 'geode-connectors'
 
 if (GradleVersion.current() < GradleVersion.version(minimumGradleVersion)) {
   throw new GradleException('Running with unsupported Gradle Version. Use Gradle Wrapper or with Gradle version >= ' + minimumGradleVersion)

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.