You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ag...@apache.org on 2017/10/23 19:08:04 UTC
[geode] 01/02: Initial project setup for jdbc-connector. Added
basic source and test implementation for JDBCAsyncWriter.
This is an automated email from the ASF dual-hosted git repository.
agingade pushed a commit to branch feature/GEODE-3781
in repository https://gitbox.apache.org/repos/asf/geode.git
commit f312f305077fe29887ee8c1e7ccd5a4d511fab15
Author: Anil <ag...@pivotal.io>
AuthorDate: Mon Oct 23 12:01:58 2017 -0700
Initial project setup for jdbc-connector.
Added basic source and test implementation for JDBCAsyncWriter.
---
geode-assembly/build.gradle | 1 +
geode-connectors/build.gradle | 32 +++++
.../geode/connectors/jdbc/JDBCAsyncWriter.java | 58 +++++++++
.../apache/geode/connectors/jdbc/JDBCHelper.java | 5 +
.../jdbc/JDBCAsyncWriterIntegrationTest.java | 144 +++++++++++++++++++++
settings.gradle | 1 +
6 files changed, 241 insertions(+)
diff --git a/geode-assembly/build.gradle b/geode-assembly/build.gradle
index 48a5cfa..ac0ac7e 100755
--- a/geode-assembly/build.gradle
+++ b/geode-assembly/build.gradle
@@ -58,6 +58,7 @@ dependencies {
archives project(':geode-common')
archives project(':geode-json')
archives project(':geode-core')
+ archives project(':geode-connectors')
archives project(':geode-lucene')
archives project(':geode-old-client-support')
archives project(':geode-protobuf')
diff --git a/geode-connectors/build.gradle b/geode-connectors/build.gradle
new file mode 100644
index 0000000..b11f352
--- /dev/null
+++ b/geode-connectors/build.gradle
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+dependencies {
+ compile project(':geode-core')
+ compile project(':geode-common')
+
+ testCompile project(':geode-junit')
+
+ //Connectors test framework.
+ testRuntime 'org.apache.derby:derby:' + project.'derby.version'
+ testCompile 'com.pholser:junit-quickcheck-core:' + project.'junit-quickcheck.version'
+ testCompile 'com.pholser:junit-quickcheck-generators:' + project.'junit-quickcheck.version'
+ testCompile files(project(':geode-core').sourceSets.test.output)
+ testCompile project(':geode-old-versions')
+}
+
+integrationTest.forkEvery 0
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java
new file mode 100644
index 0000000..cc0b30d
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+
+/*
+ * This class provides write behind cache semantics for a JDBC data source using AsyncEventListener.
+ *
+ * @since Geode 1.4
+ */
+public class JDBCAsyncWriter implements AsyncEventListener {
+
+ private long totalEvents = 0;
+
+ private long successfulEvents = 0;
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public boolean processEvents(List<AsyncEvent> events) {
+ totalEvents += events.size();
+ return true;
+ }
+
+ @Override
+ public void init(Properties props) {
+
+ };
+
+ public long getTotalEvents() {
+ return this.totalEvents;
+ }
+
+ public long getsuccessfulEvents() {
+ return this.successfulEvents;
+ }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCHelper.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCHelper.java
new file mode 100644
index 0000000..edad4c4
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCHelper.java
@@ -0,0 +1,5 @@
+package org.apache.geode.connectors.jdbc;
+
+public class JDBCHelper {
+
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
new file mode 100644
index 0000000..d135dfc
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc;
+
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.assertj.core.api.Assertions.*;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.*;
+import org.junit.experimental.categories.Category;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.awaitility.Awaitility;
+
+@Category(IntegrationTest.class)
+public class JDBCAsyncWriterIntegrationTest {
+
+ private Cache cache;
+
+ private Connection conn;
+
+ private Statement stmt;
+
+ JDBCAsyncWriter jdbcWriter;
+
+ private String dbName="DerbyDB";
+
+ private String regionTableName = "employees";
+
+ @Before
+ public void setup() throws Exception {
+ try {
+ cache = CacheFactory.getAnyInstance();
+ } catch (Exception e) {
+ // ignore
+ }
+ if (null == cache) {
+ cache = (GemFireCacheImpl) new CacheFactory().set(MCAST_PORT, "0").create();
+ }
+ setupDB();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (cache != null && !cache.isClosed()) {
+ cache.close();
+ cache = null;
+ }
+ closeDB();
+ }
+
+ public void setupDB() throws Exception {
+ String driver = "org.apache.derby.jdbc.EmbeddedDriver";
+ String connectionURL = "jdbc:derby:memory:" + dbName + ";create=true";
+ Class.forName(driver);
+ conn = DriverManager.getConnection(connectionURL);
+ stmt = conn.createStatement();
+ stmt.execute("Create Table " + regionTableName + " (id varchar(10), name varchar(10))");
+ }
+
+ public void closeDB() throws Exception {
+ if (stmt == null) {
+ stmt = conn.createStatement();
+ }
+ stmt.execute("Drop table " + regionTableName);
+ stmt.close();
+
+ if (conn != null) {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void canInstallJDBCAsyncWriterOnRegion() {
+ Region employees = createRegionWithJDBCAsyncWriter("employees");
+ employees.put("1", "Emp1");
+ try {Thread.sleep(100);} catch (Exception ex){}
+ employees.put("2", "Emp2");
+
+ Awaitility.await().atMost(30, TimeUnit.SECONDS)
+ .until(() -> assertThat(jdbcWriter.getTotalEvents()).isEqualTo(2));
+
+ }
+
+ @Test
+ public void jdbcAsyncWriterCanWriteToDatabase() throws Exception {
+ Region employees = createRegionWithJDBCAsyncWriter("employees");
+
+ employees.put("1", "Emp1");
+ employees.put("2", "Emp2");
+
+ validateTableRowCount(2);
+ }
+
+ private Region createRegionWithJDBCAsyncWriter(String regionName) {
+ jdbcWriter = new JDBCAsyncWriter();
+ cache.createAsyncEventQueueFactory().setBatchSize(1)
+ .setBatchTimeInterval(1).create("jdbcAsyncQueue", jdbcWriter);
+
+ RegionFactory rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
+ rf.addAsyncEventQueueId("jdbcAsyncQueue");
+ return rf.create(regionName);
+ }
+
+ private void validateTableRowCount(int expected) throws Exception {
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> {
+ int size = 0;
+ try {
+ ResultSet rs = stmt.executeQuery("select count(*) from " + regionTableName);
+ while (rs.next()) {
+ size = rs.getInt(1);
+ }
+ } catch (Exception ex) {
+ // Need to fix this.
+ System.out.println("Exception while getting the table row count");
+ }
+ assertThat(size).isEqualTo(expected);
+ });
+ }
+
+}
diff --git a/settings.gradle b/settings.gradle
index 87f4a9c..9cbad6d 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -40,6 +40,7 @@ include 'extensions/geode-modules-assembly'
include 'geode-protobuf'
include 'extensions/session-testing-war'
include 'geode-concurrency-test'
+include 'geode-connectors'
if (GradleVersion.current() < GradleVersion.version(minimumGradleVersion)) {
throw new GradleException('Running with unsupported Gradle Version. Use Gradle Wrapper or with Gradle version >= ' + minimumGradleVersion)
--
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.