You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2018/03/15 21:46:40 UTC

[geode] branch develop updated: GEODE-4844: JdbcWriter and JdbcAsyncWriter will write rows loaded by the JdbcLoader (#1618)

This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5dc08bf  GEODE-4844: JdbcWriter and JdbcAsyncWriter will write rows loaded by the JdbcLoader (#1618)
5dc08bf is described below

commit 5dc08bf0878c9cef38fa9c321a638656531beb09
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Thu Mar 15 14:46:37 2018 -0700

    GEODE-4844: JdbcWriter and JdbcAsyncWriter will write rows loaded by the JdbcLoader (#1618)
    
    * JdbcWriter and JdbcAsyncWriter will no longer write to sql
      if the event was from a load. This will cause them to never write
      back out events from the JdbcLoader. It also means that if someone
      is using one of the jdbc writers on a region that uses some other loader
      that the jdbc writer will ignore data loaded by it. We think the common
      use case will be if you have a jdbc writer you will either also have the
      JdbcLoader or no loader. In the future we may need to add a feature so
      that a user can tell the jdbc writer to not ignore load events.
---
 .../geode/connectors/jdbc/JdbcAsyncWriter.java     | 20 ++++++++--
 .../apache/geode/connectors/jdbc/JdbcWriter.java   | 11 ++++++
 .../jdbc/internal/AbstractJdbcCallback.java        | 14 +++----
 .../geode/connectors/jdbc/JdbcAsyncWriterTest.java | 18 ++++++++-
 .../geode/connectors/jdbc/JdbcDUnitTest.java       | 34 +++++++++++++++++
 .../geode/connectors/jdbc/JdbcWriterTest.java      | 44 +++++++++++++++++++---
 .../jdbc/internal/AbstractJdbcCallbackTest.java    | 22 +++++++++++
 7 files changed, 145 insertions(+), 18 deletions(-)

diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
index f6d2d6b..e36469d 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
@@ -39,9 +39,10 @@ import org.apache.geode.pdx.PdxInstance;
 public class JdbcAsyncWriter extends AbstractJdbcCallback implements AsyncEventListener {
   private static final Logger logger = LogService.getLogger();
 
-  private LongAdder totalEvents = new LongAdder();
-  private LongAdder successfulEvents = new LongAdder();
-  private LongAdder failedEvents = new LongAdder();
+  private final LongAdder totalEvents = new LongAdder();
+  private final LongAdder successfulEvents = new LongAdder();
+  private final LongAdder failedEvents = new LongAdder();
+  private final LongAdder ignoredEvents = new LongAdder();
 
   @SuppressWarnings("unused")
   public JdbcAsyncWriter() {
@@ -65,7 +66,10 @@ public class JdbcAsyncWriter extends AbstractJdbcCallback implements AsyncEventL
     cache.setPdxReadSerializedOverride(true);
     try {
       for (AsyncEvent event : events) {
-
+        if (eventCanBeIgnored(event.getOperation())) {
+          changeIgnoredEvents(1);
+          continue;
+        }
         try {
           getSqlHandler().write(event.getRegion(), event.getOperation(), event.getKey(),
               getPdxInstance(event));
@@ -93,6 +97,10 @@ public class JdbcAsyncWriter extends AbstractJdbcCallback implements AsyncEventL
     return failedEvents.longValue();
   }
 
+  long getIgnoredEvents() {
+    return ignoredEvents.longValue();
+  }
+
   private void changeSuccessfulEvents(long delta) {
     successfulEvents.add(delta);
   }
@@ -105,6 +113,10 @@ public class JdbcAsyncWriter extends AbstractJdbcCallback implements AsyncEventL
     totalEvents.add(delta);
   }
 
+  private void changeIgnoredEvents(long delta) {
+    ignoredEvents.add(delta);
+  }
+
   /**
    * precondition: DefaultQuery.setPdxReadSerialized(true)
    */
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java
index 193066f..998cb8a 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java
@@ -15,6 +15,7 @@
 package org.apache.geode.connectors.jdbc;
 
 import java.sql.SQLException;
+import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.geode.CopyHelper;
 import org.apache.geode.annotations.Experimental;
@@ -36,6 +37,8 @@ import org.apache.geode.pdx.PdxInstance;
 @Experimental
 public class JdbcWriter<K, V> extends AbstractJdbcCallback implements CacheWriter<K, V> {
 
+  private final LongAdder totalEvents = new LongAdder();
+
   @SuppressWarnings("unused")
   public JdbcWriter() {
     super();
@@ -73,7 +76,11 @@ public class JdbcWriter<K, V> extends AbstractJdbcCallback implements CacheWrite
   }
 
   private void writeEvent(EntryEvent<K, V> event) {
+    if (eventCanBeIgnored(event.getOperation())) {
+      return;
+    }
     checkInitialized((InternalCache) event.getRegion().getRegionService());
+    totalEvents.add(1);
     try {
       getSqlHandler().write(event.getRegion(), event.getOperation(), event.getKey(),
           getPdxNewValue(event));
@@ -105,4 +112,8 @@ public class JdbcWriter<K, V> extends AbstractJdbcCallback implements CacheWrite
       cache.setPdxReadSerializedOverride(initialPdxReadSerialized);
     }
   }
+
+  long getTotalEvents() {
+    return totalEvents.longValue();
+  }
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
index 4b23f6e..b3baabe 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
@@ -14,17 +14,16 @@
  */
 package org.apache.geode.connectors.jdbc.internal;
 
-import java.util.Properties;
-
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.CacheCallback;
+import org.apache.geode.cache.Operation;
 import org.apache.geode.internal.cache.InternalCache;
 
 @Experimental
 public abstract class AbstractJdbcCallback implements CacheCallback {
 
   private volatile SqlHandler sqlHandler;
-  protected volatile InternalCache cache;
+  protected InternalCache cache;
 
   protected AbstractJdbcCallback() {
     // nothing
@@ -42,11 +41,6 @@ public abstract class AbstractJdbcCallback implements CacheCallback {
     }
   }
 
-  @Override
-  public void init(Properties props) {
-    // nothing
-  }
-
   protected SqlHandler getSqlHandler() {
     return sqlHandler;
   }
@@ -57,6 +51,10 @@ public abstract class AbstractJdbcCallback implements CacheCallback {
     }
   }
 
+  protected boolean eventCanBeIgnored(Operation operation) {
+    return operation.isLoad();
+  }
+
   private synchronized void initialize(InternalCache cache) {
     if (sqlHandler == null) {
       this.cache = cache;
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterTest.java
index 3d348c1..4a55a45 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterTest.java
@@ -31,6 +31,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.asyncqueue.AsyncEvent;
 import org.apache.geode.connectors.jdbc.internal.SqlHandler;
 import org.apache.geode.internal.cache.InternalCache;
@@ -82,6 +83,16 @@ public class JdbcAsyncWriterTest {
   }
 
   @Test
+  public void ignoresLoadEvent() throws Exception {
+    writer.processEvents(Collections.singletonList(createMockEvent(Operation.LOCAL_LOAD_CREATE)));
+
+    verify(sqlHandler, times(0)).write(any(), any(), any(), any());
+    assertThat(writer.getIgnoredEvents()).isEqualTo(1);
+    assertThat(writer.getTotalEvents()).isEqualTo(1);
+    assertThat(writer.getFailedEvents()).isEqualTo(0);
+  }
+
+  @Test
   public void writesMultipleProvidedEvents() throws Exception {
     List<AsyncEvent> events = new ArrayList<>();
     events.add(createMockEvent());
@@ -95,9 +106,14 @@ public class JdbcAsyncWriterTest {
     assertThat(writer.getTotalEvents()).isEqualTo(3);
   }
 
-  private AsyncEvent createMockEvent() {
+  private AsyncEvent createMockEvent(Operation op) {
     AsyncEvent event = mock(AsyncEvent.class);
+    when(event.getOperation()).thenReturn(op);
     when(event.getRegion()).thenReturn(region);
     return event;
   }
+
+  private AsyncEvent createMockEvent() {
+    return createMockEvent(Operation.CREATE);
+  }
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java
index 4c4e16a..ed86c82 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java
@@ -300,15 +300,49 @@ public class JdbcDUnitTest implements Serializable {
       region.put(key, pdxEmployee1);
       region.invalidate(key);
 
+      JdbcWriter writer = (JdbcWriter) region.getAttributes().getCacheWriter();
+      long writeCallsCompletedBeforeGet = writer.getTotalEvents();
+
       PdxInstance result = (PdxInstance) region.get(key);
       assertThat(result.getFieldNames()).hasSize(3);
       assertThat(result.getField("id")).isEqualTo(key);
       assertThat(result.getField("name")).isEqualTo("Emp1");
       assertThat(result.getField("age")).isEqualTo(55);
+      assertThat(writer.getTotalEvents()).isEqualTo(writeCallsCompletedBeforeGet);
     });
   }
 
   @Test
+  public void getReadsFromDBWithAsyncWriter() throws Exception {
+    createTable();
+    createRegionUsingGfsh(false, true, true);
+    createJdbcConnection();
+    createMapping(REGION_NAME, CONNECTION_NAME);
+    server.invoke(() -> {
+      PdxInstance pdxEmployee1 =
+          ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
+              .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
+      String key = "id1";
+      Region region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
+      JdbcAsyncWriter asyncWriter = (JdbcAsyncWriter) ClusterStartupRule.getCache()
+          .getAsyncEventQueue("JAW").getAsyncEventListener();
+
+      region.put(key, pdxEmployee1);
+      Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
+        assertThat(asyncWriter.getSuccessfulEvents()).isEqualTo(1);
+      });
+      region.invalidate(key);
+      PdxInstance result = (PdxInstance) region.get(key);
+
+      assertThat(result.getField("id")).isEqualTo(pdxEmployee1.getField("id"));
+      assertThat(result.getField("name")).isEqualTo(pdxEmployee1.getField("name"));
+      assertThat(result.getField("age")).isEqualTo(pdxEmployee1.getField("age"));
+      Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
+        assertThat(asyncWriter.getIgnoredEvents()).isEqualTo(1);
+      });
+    });
+  }
+
   public void getReadsFromDBWithPdxClassName() throws Exception {
     createTable();
     createRegionUsingGfsh(true, false, true);
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcWriterTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcWriterTest.java
index 27cc370..b1773a1 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcWriterTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcWriterTest.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.connectors.jdbc;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
@@ -28,6 +29,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.RegionEvent;
 import org.apache.geode.cache.SerializedCacheValue;
 import org.apache.geode.connectors.jdbc.internal.SqlHandler;
@@ -43,9 +45,11 @@ public class JdbcWriterTest {
   private EntryEvent<Object, Object> entryEvent;
   private PdxInstance pdxInstance;
   private SqlHandler sqlHandler;
+  private InternalRegion region;
   private SerializedCacheValue<Object> serializedNewValue;
   private RegionEvent<Object, Object> regionEvent;
   private InternalCache cache;
+  private Object key;
 
   private JdbcWriter<Object, Object> writer;
 
@@ -54,13 +58,17 @@ public class JdbcWriterTest {
     entryEvent = mock(EntryEvent.class);
     pdxInstance = mock(PdxInstance.class);
     sqlHandler = mock(SqlHandler.class);
+    region = mock(InternalRegion.class);
     serializedNewValue = mock(SerializedCacheValue.class);
     regionEvent = mock(RegionEvent.class);
     cache = Fakes.cache();
+    key = "key";
 
-    when(entryEvent.getRegion()).thenReturn(mock(InternalRegion.class));
+    when(entryEvent.getRegion()).thenReturn(region);
+    when(entryEvent.getKey()).thenReturn(key);
     when(entryEvent.getRegion().getRegionService()).thenReturn(cache);
     when(entryEvent.getSerializedNewValue()).thenReturn(serializedNewValue);
+    when(entryEvent.getOperation()).thenReturn(Operation.CREATE);
     when(serializedNewValue.getDeserializedValue()).thenReturn(pdxInstance);
 
     writer = new JdbcWriter<>(sqlHandler, cache);
@@ -70,7 +78,7 @@ public class JdbcWriterTest {
   public void beforeUpdateWithPdxInstanceWritesToSqlHandler() throws Exception {
     writer.beforeUpdate(entryEvent);
 
-    verify(sqlHandler, times(1)).write(any(), any(), any(), eq(pdxInstance));
+    verify(sqlHandler, times(1)).write(eq(region), eq(Operation.CREATE), eq(key), eq(pdxInstance));
   }
 
   @Test
@@ -85,14 +93,40 @@ public class JdbcWriterTest {
   public void beforeCreateWithPdxInstanceWritesToSqlHandler() throws Exception {
     writer.beforeCreate(entryEvent);
 
-    verify(sqlHandler, times(1)).write(any(), any(), any(), eq(pdxInstance));
+    verify(sqlHandler, times(1)).write(eq(region), eq(Operation.CREATE), eq(key), eq(pdxInstance));
+    assertThat(writer.getTotalEvents()).isEqualTo(1);
   }
 
   @Test
-  public void beforeDestroyWithPdxInstanceWritesToSqlHandler() throws Exception {
+  public void beforeCreateWithNewPdxInstanceWritesToSqlHandler() throws Exception {
+    PdxInstance newPdxInstance = mock(PdxInstance.class);
+    when(entryEvent.getNewValue()).thenReturn(newPdxInstance);
+    when(entryEvent.getSerializedNewValue()).thenReturn(null);
+    writer.beforeCreate(entryEvent);
+
+    verify(sqlHandler, times(1)).write(eq(region), eq(Operation.CREATE), eq(key),
+        eq(newPdxInstance));
+    assertThat(writer.getTotalEvents()).isEqualTo(1);
+  }
+
+  @Test
+  public void beforeCreateWithLoadEventDoesNothing() throws Exception {
+    when(entryEvent.getOperation()).thenReturn(Operation.LOCAL_LOAD_CREATE);
+
+    writer.beforeCreate(entryEvent);
+
+    verify(sqlHandler, times(0)).write(any(), any(), any(), any());
+    assertThat(writer.getTotalEvents()).isEqualTo(0);
+  }
+
+  @Test
+  public void beforeDestroyWithDestroyEventWritesToSqlHandler() throws Exception {
+    when(entryEvent.getOperation()).thenReturn(Operation.DESTROY);
+    when(entryEvent.getSerializedNewValue()).thenReturn(null);
+
     writer.beforeDestroy(entryEvent);
 
-    verify(sqlHandler, times(1)).write(any(), any(), any(), eq(pdxInstance));
+    verify(sqlHandler, times(1)).write(eq(region), eq(Operation.DESTROY), eq(key), eq(null));
   }
 
   @Test
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallbackTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallbackTest.java
index c089b76..62df1af 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallbackTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallbackTest.java
@@ -26,6 +26,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.geode.cache.Operation;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.test.fake.Fakes;
 import org.apache.geode.test.junit.categories.UnitTest;
@@ -51,6 +52,14 @@ public class AbstractJdbcCallbackTest {
   }
 
   @Test
+  public void closeDoesNothingIfSqlHandlerNull() {
+    jdbcCallback = new AbstractJdbcCallback(null, cache) {};
+    jdbcCallback.close();
+    verify(sqlHandler, times(0)).close();
+  }
+
+
+  @Test
   public void returnsCorrectSqlHander() {
     assertThat(jdbcCallback.getSqlHandler()).isSameAs(sqlHandler);
   }
@@ -73,4 +82,17 @@ public class AbstractJdbcCallbackTest {
 
     assertThat(jdbcCallback.getSqlHandler()).isNotNull();
   }
+
+  @Test
+  public void verifyLoadsAreIgnored() {
+    boolean ignoreEvent = jdbcCallback.eventCanBeIgnored(Operation.LOCAL_LOAD_CREATE);
+    assertThat(ignoreEvent).isTrue();
+  }
+
+  @Test
+  public void verifyCreateAreNotIgnored() {
+    boolean ignoreEvent = jdbcCallback.eventCanBeIgnored(Operation.CREATE);
+    assertThat(ignoreEvent).isFalse();
+  }
+
 }

-- 
To stop receiving notification emails like this one, please contact
dschneider@apache.org.