You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2018/03/15 21:46:40 UTC
[geode] branch develop updated: GEODE-4844: JdbcWriter and
JdbcAsyncWriter will write rows loaded by the JdbcLoader (#1618)
This is an automated email from the ASF dual-hosted git repository.
dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 5dc08bf GEODE-4844: JdbcWriter and JdbcAsyncWriter will write rows loaded by the JdbcLoader (#1618)
5dc08bf is described below
commit 5dc08bf0878c9cef38fa9c321a638656531beb09
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Thu Mar 15 14:46:37 2018 -0700
GEODE-4844: JdbcWriter and JdbcAsyncWriter will write rows loaded by the JdbcLoader (#1618)
* JdbcWriter and JdbcAsyncWriter will no longer write to sql
if the event was from a load. This will cause them to never write
back out events from the JdbcLoader. It also means that if someone
is using one of the jdbc writers on a region that uses some other loader
that the jdbc writer will ignore data loaded by it. We think the common
use case will be if you have a jdbc writer you will either also have the
JdbcLoader or no loader. In the future we may need to add a feature so
that a user can tell the jdbc writer to not ignore load events.
---
.../geode/connectors/jdbc/JdbcAsyncWriter.java | 20 ++++++++--
.../apache/geode/connectors/jdbc/JdbcWriter.java | 11 ++++++
.../jdbc/internal/AbstractJdbcCallback.java | 14 +++----
.../geode/connectors/jdbc/JdbcAsyncWriterTest.java | 18 ++++++++-
.../geode/connectors/jdbc/JdbcDUnitTest.java | 34 +++++++++++++++++
.../geode/connectors/jdbc/JdbcWriterTest.java | 44 +++++++++++++++++++---
.../jdbc/internal/AbstractJdbcCallbackTest.java | 22 +++++++++++
7 files changed, 145 insertions(+), 18 deletions(-)
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
index f6d2d6b..e36469d 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
@@ -39,9 +39,10 @@ import org.apache.geode.pdx.PdxInstance;
public class JdbcAsyncWriter extends AbstractJdbcCallback implements AsyncEventListener {
private static final Logger logger = LogService.getLogger();
- private LongAdder totalEvents = new LongAdder();
- private LongAdder successfulEvents = new LongAdder();
- private LongAdder failedEvents = new LongAdder();
+ private final LongAdder totalEvents = new LongAdder();
+ private final LongAdder successfulEvents = new LongAdder();
+ private final LongAdder failedEvents = new LongAdder();
+ private final LongAdder ignoredEvents = new LongAdder();
@SuppressWarnings("unused")
public JdbcAsyncWriter() {
@@ -65,7 +66,10 @@ public class JdbcAsyncWriter extends AbstractJdbcCallback implements AsyncEventL
cache.setPdxReadSerializedOverride(true);
try {
for (AsyncEvent event : events) {
-
+ if (eventCanBeIgnored(event.getOperation())) {
+ changeIgnoredEvents(1);
+ continue;
+ }
try {
getSqlHandler().write(event.getRegion(), event.getOperation(), event.getKey(),
getPdxInstance(event));
@@ -93,6 +97,10 @@ public class JdbcAsyncWriter extends AbstractJdbcCallback implements AsyncEventL
return failedEvents.longValue();
}
+ long getIgnoredEvents() {
+ return ignoredEvents.longValue();
+ }
+
private void changeSuccessfulEvents(long delta) {
successfulEvents.add(delta);
}
@@ -105,6 +113,10 @@ public class JdbcAsyncWriter extends AbstractJdbcCallback implements AsyncEventL
totalEvents.add(delta);
}
+ private void changeIgnoredEvents(long delta) {
+ ignoredEvents.add(delta);
+ }
+
/**
* precondition: DefaultQuery.setPdxReadSerialized(true)
*/
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java
index 193066f..998cb8a 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java
@@ -15,6 +15,7 @@
package org.apache.geode.connectors.jdbc;
import java.sql.SQLException;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.geode.CopyHelper;
import org.apache.geode.annotations.Experimental;
@@ -36,6 +37,8 @@ import org.apache.geode.pdx.PdxInstance;
@Experimental
public class JdbcWriter<K, V> extends AbstractJdbcCallback implements CacheWriter<K, V> {
+ private final LongAdder totalEvents = new LongAdder();
+
@SuppressWarnings("unused")
public JdbcWriter() {
super();
@@ -73,7 +76,11 @@ public class JdbcWriter<K, V> extends AbstractJdbcCallback implements CacheWrite
}
private void writeEvent(EntryEvent<K, V> event) {
+ if (eventCanBeIgnored(event.getOperation())) {
+ return;
+ }
checkInitialized((InternalCache) event.getRegion().getRegionService());
+ totalEvents.add(1);
try {
getSqlHandler().write(event.getRegion(), event.getOperation(), event.getKey(),
getPdxNewValue(event));
@@ -105,4 +112,8 @@ public class JdbcWriter<K, V> extends AbstractJdbcCallback implements CacheWrite
cache.setPdxReadSerializedOverride(initialPdxReadSerialized);
}
}
+
+ long getTotalEvents() {
+ return totalEvents.longValue();
+ }
}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
index 4b23f6e..b3baabe 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
@@ -14,17 +14,16 @@
*/
package org.apache.geode.connectors.jdbc.internal;
-import java.util.Properties;
-
import org.apache.geode.annotations.Experimental;
import org.apache.geode.cache.CacheCallback;
+import org.apache.geode.cache.Operation;
import org.apache.geode.internal.cache.InternalCache;
@Experimental
public abstract class AbstractJdbcCallback implements CacheCallback {
private volatile SqlHandler sqlHandler;
- protected volatile InternalCache cache;
+ protected InternalCache cache;
protected AbstractJdbcCallback() {
// nothing
@@ -42,11 +41,6 @@ public abstract class AbstractJdbcCallback implements CacheCallback {
}
}
- @Override
- public void init(Properties props) {
- // nothing
- }
-
protected SqlHandler getSqlHandler() {
return sqlHandler;
}
@@ -57,6 +51,10 @@ public abstract class AbstractJdbcCallback implements CacheCallback {
}
}
+ protected boolean eventCanBeIgnored(Operation operation) {
+ return operation.isLoad();
+ }
+
private synchronized void initialize(InternalCache cache) {
if (sqlHandler == null) {
this.cache = cache;
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterTest.java
index 3d348c1..4a55a45 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterTest.java
@@ -31,6 +31,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.geode.cache.Operation;
import org.apache.geode.cache.asyncqueue.AsyncEvent;
import org.apache.geode.connectors.jdbc.internal.SqlHandler;
import org.apache.geode.internal.cache.InternalCache;
@@ -82,6 +83,16 @@ public class JdbcAsyncWriterTest {
}
@Test
+ public void ignoresLoadEvent() throws Exception {
+ writer.processEvents(Collections.singletonList(createMockEvent(Operation.LOCAL_LOAD_CREATE)));
+
+ verify(sqlHandler, times(0)).write(any(), any(), any(), any());
+ assertThat(writer.getIgnoredEvents()).isEqualTo(1);
+ assertThat(writer.getTotalEvents()).isEqualTo(1);
+ assertThat(writer.getFailedEvents()).isEqualTo(0);
+ }
+
+ @Test
public void writesMultipleProvidedEvents() throws Exception {
List<AsyncEvent> events = new ArrayList<>();
events.add(createMockEvent());
@@ -95,9 +106,14 @@ public class JdbcAsyncWriterTest {
assertThat(writer.getTotalEvents()).isEqualTo(3);
}
- private AsyncEvent createMockEvent() {
+ private AsyncEvent createMockEvent(Operation op) {
AsyncEvent event = mock(AsyncEvent.class);
+ when(event.getOperation()).thenReturn(op);
when(event.getRegion()).thenReturn(region);
return event;
}
+
+ private AsyncEvent createMockEvent() {
+ return createMockEvent(Operation.CREATE);
+ }
}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java
index 4c4e16a..ed86c82 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java
@@ -300,15 +300,49 @@ public class JdbcDUnitTest implements Serializable {
region.put(key, pdxEmployee1);
region.invalidate(key);
+ JdbcWriter writer = (JdbcWriter) region.getAttributes().getCacheWriter();
+ long writeCallsCompletedBeforeGet = writer.getTotalEvents();
+
PdxInstance result = (PdxInstance) region.get(key);
assertThat(result.getFieldNames()).hasSize(3);
assertThat(result.getField("id")).isEqualTo(key);
assertThat(result.getField("name")).isEqualTo("Emp1");
assertThat(result.getField("age")).isEqualTo(55);
+ assertThat(writer.getTotalEvents()).isEqualTo(writeCallsCompletedBeforeGet);
});
}
@Test
+ public void getReadsFromDBWithAsyncWriter() throws Exception {
+ createTable();
+ createRegionUsingGfsh(false, true, true);
+ createJdbcConnection();
+ createMapping(REGION_NAME, CONNECTION_NAME);
+ server.invoke(() -> {
+ PdxInstance pdxEmployee1 =
+ ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
+ .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
+ String key = "id1";
+ Region region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
+ JdbcAsyncWriter asyncWriter = (JdbcAsyncWriter) ClusterStartupRule.getCache()
+ .getAsyncEventQueue("JAW").getAsyncEventListener();
+
+ region.put(key, pdxEmployee1);
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
+ assertThat(asyncWriter.getSuccessfulEvents()).isEqualTo(1);
+ });
+ region.invalidate(key);
+ PdxInstance result = (PdxInstance) region.get(key);
+
+ assertThat(result.getField("id")).isEqualTo(pdxEmployee1.getField("id"));
+ assertThat(result.getField("name")).isEqualTo(pdxEmployee1.getField("name"));
+ assertThat(result.getField("age")).isEqualTo(pdxEmployee1.getField("age"));
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
+ assertThat(asyncWriter.getIgnoredEvents()).isEqualTo(1);
+ });
+ });
+ }
+
public void getReadsFromDBWithPdxClassName() throws Exception {
createTable();
createRegionUsingGfsh(true, false, true);
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcWriterTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcWriterTest.java
index 27cc370..b1773a1 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcWriterTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcWriterTest.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.connectors.jdbc;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
@@ -28,6 +29,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Operation;
import org.apache.geode.cache.RegionEvent;
import org.apache.geode.cache.SerializedCacheValue;
import org.apache.geode.connectors.jdbc.internal.SqlHandler;
@@ -43,9 +45,11 @@ public class JdbcWriterTest {
private EntryEvent<Object, Object> entryEvent;
private PdxInstance pdxInstance;
private SqlHandler sqlHandler;
+ private InternalRegion region;
private SerializedCacheValue<Object> serializedNewValue;
private RegionEvent<Object, Object> regionEvent;
private InternalCache cache;
+ private Object key;
private JdbcWriter<Object, Object> writer;
@@ -54,13 +58,17 @@ public class JdbcWriterTest {
entryEvent = mock(EntryEvent.class);
pdxInstance = mock(PdxInstance.class);
sqlHandler = mock(SqlHandler.class);
+ region = mock(InternalRegion.class);
serializedNewValue = mock(SerializedCacheValue.class);
regionEvent = mock(RegionEvent.class);
cache = Fakes.cache();
+ key = "key";
- when(entryEvent.getRegion()).thenReturn(mock(InternalRegion.class));
+ when(entryEvent.getRegion()).thenReturn(region);
+ when(entryEvent.getKey()).thenReturn(key);
when(entryEvent.getRegion().getRegionService()).thenReturn(cache);
when(entryEvent.getSerializedNewValue()).thenReturn(serializedNewValue);
+ when(entryEvent.getOperation()).thenReturn(Operation.CREATE);
when(serializedNewValue.getDeserializedValue()).thenReturn(pdxInstance);
writer = new JdbcWriter<>(sqlHandler, cache);
@@ -70,7 +78,7 @@ public class JdbcWriterTest {
public void beforeUpdateWithPdxInstanceWritesToSqlHandler() throws Exception {
writer.beforeUpdate(entryEvent);
- verify(sqlHandler, times(1)).write(any(), any(), any(), eq(pdxInstance));
+ verify(sqlHandler, times(1)).write(eq(region), eq(Operation.CREATE), eq(key), eq(pdxInstance));
}
@Test
@@ -85,14 +93,40 @@ public class JdbcWriterTest {
public void beforeCreateWithPdxInstanceWritesToSqlHandler() throws Exception {
writer.beforeCreate(entryEvent);
- verify(sqlHandler, times(1)).write(any(), any(), any(), eq(pdxInstance));
+ verify(sqlHandler, times(1)).write(eq(region), eq(Operation.CREATE), eq(key), eq(pdxInstance));
+ assertThat(writer.getTotalEvents()).isEqualTo(1);
}
@Test
- public void beforeDestroyWithPdxInstanceWritesToSqlHandler() throws Exception {
+ public void beforeCreateWithNewPdxInstanceWritesToSqlHandler() throws Exception {
+ PdxInstance newPdxInstance = mock(PdxInstance.class);
+ when(entryEvent.getNewValue()).thenReturn(newPdxInstance);
+ when(entryEvent.getSerializedNewValue()).thenReturn(null);
+ writer.beforeCreate(entryEvent);
+
+ verify(sqlHandler, times(1)).write(eq(region), eq(Operation.CREATE), eq(key),
+ eq(newPdxInstance));
+ assertThat(writer.getTotalEvents()).isEqualTo(1);
+ }
+
+ @Test
+ public void beforeCreateWithLoadEventDoesNothing() throws Exception {
+ when(entryEvent.getOperation()).thenReturn(Operation.LOCAL_LOAD_CREATE);
+
+ writer.beforeCreate(entryEvent);
+
+ verify(sqlHandler, times(0)).write(any(), any(), any(), any());
+ assertThat(writer.getTotalEvents()).isEqualTo(0);
+ }
+
+ @Test
+ public void beforeDestroyWithDestroyEventWritesToSqlHandler() throws Exception {
+ when(entryEvent.getOperation()).thenReturn(Operation.DESTROY);
+ when(entryEvent.getSerializedNewValue()).thenReturn(null);
+
writer.beforeDestroy(entryEvent);
- verify(sqlHandler, times(1)).write(any(), any(), any(), eq(pdxInstance));
+ verify(sqlHandler, times(1)).write(eq(region), eq(Operation.DESTROY), eq(key), eq(null));
}
@Test
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallbackTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallbackTest.java
index c089b76..62df1af 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallbackTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallbackTest.java
@@ -26,6 +26,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.geode.cache.Operation;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.test.fake.Fakes;
import org.apache.geode.test.junit.categories.UnitTest;
@@ -51,6 +52,14 @@ public class AbstractJdbcCallbackTest {
}
@Test
+ public void closeDoesNothingIfSqlHandlerNull() {
+ jdbcCallback = new AbstractJdbcCallback(null, cache) {};
+ jdbcCallback.close();
+ verify(sqlHandler, times(0)).close();
+ }
+
+
+ @Test
public void returnsCorrectSqlHander() {
assertThat(jdbcCallback.getSqlHandler()).isSameAs(sqlHandler);
}
@@ -73,4 +82,17 @@ public class AbstractJdbcCallbackTest {
assertThat(jdbcCallback.getSqlHandler()).isNotNull();
}
+
+ @Test
+ public void verifyLoadsAreIgnored() {
+ boolean ignoreEvent = jdbcCallback.eventCanBeIgnored(Operation.LOCAL_LOAD_CREATE);
+ assertThat(ignoreEvent).isTrue();
+ }
+
+ @Test
+ public void verifyCreateAreNotIgnored() {
+ boolean ignoreEvent = jdbcCallback.eventCanBeIgnored(Operation.CREATE);
+ assertThat(ignoreEvent).isFalse();
+ }
+
}
--
To stop receiving notification emails like this one, please contact
dschneider@apache.org.