You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2020/04/03 18:38:31 UTC
[geode-kafka-connector] branch master updated: Code improvements:
This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new d2b2f54 Code improvements:
d2b2f54 is described below
commit d2b2f54cb77eed373d08e1ba88036f0847528ff5
Author: Nabarun Nag <na...@cs.wisc.edu>
AuthorDate: Fri Apr 3 11:35:34 2020 -0700
Code improvements:
* Exceptions being thrown as ConnectException
* Return values are used.
* static analyzer recommendations are implemented.
---
.../geode/kafka/converter/JsonPdxConverter.java | 2 +-
.../org/apache/geode/kafka/sink/BatchRecords.java | 8 +++----
.../geode/kafka/sink/GeodeKafkaSinkTask.java | 18 +++++++++++----
.../geode/kafka/sink/GeodeSinkConnectorConfig.java | 6 +----
.../geode/kafka/source/GeodeKafkaSourceTask.java | 27 ++++++++++++++--------
.../apache/geode/kafka/utils/EnumValidator.java | 2 +-
.../kafka-connect-geode-version.properties | 2 +-
.../kafka/converter/JsonPdxConverterDUnitTest.java | 6 ++---
.../kafka/source/GeodeKafkaSourceTaskTest.java | 5 ----
9 files changed, 41 insertions(+), 35 deletions(-)
diff --git a/src/main/java/org/apache/geode/kafka/converter/JsonPdxConverter.java b/src/main/java/org/apache/geode/kafka/converter/JsonPdxConverter.java
index e630b4e..03a6f0a 100644
--- a/src/main/java/org/apache/geode/kafka/converter/JsonPdxConverter.java
+++ b/src/main/java/org/apache/geode/kafka/converter/JsonPdxConverter.java
@@ -28,7 +28,7 @@ public class JsonPdxConverter implements Converter {
public static final String JSON_TYPE_ANNOTATION = "\"@type\"";
// Default value = false
public static final String ADD_TYPE_ANNOTATION_TO_JSON = "add-type-annotation-to-json";
- private Map<String, String> internalConfig = new HashMap<>();
+ final private Map<String, String> internalConfig = new HashMap<>();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
diff --git a/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java b/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
index 7974abd..909cd7c 100644
--- a/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
+++ b/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
@@ -44,8 +44,8 @@ public class BatchRecords {
public void addRemoveOperation(SinkRecord record) {
// if a previous operation added to the update map
- // let's just remove it so we don't do a put and then a remove
- // depending on the order of operations (putAll then removeAll or removeAll or putAll)...
+ // let's just remove it so, we don't do a put and then a remove
+ // depending on the order of operations (putAll then removeAll or putAll)...
// ...we could remove one of the if statements.
if (updateMap.containsKey(record.key())) {
updateMap.remove(record.key());
@@ -56,7 +56,7 @@ public class BatchRecords {
public void addUpdateOperation(SinkRecord record, boolean nullValuesMeansRemove) {
// it's assumed the records in are order
- // if so if a previous value was in the remove list
+ // if so then a previous value was in the remove list
// let's not remove it at the end of this operation
if (nullValuesMeansRemove) {
removeList.remove(record.key());
@@ -70,7 +70,7 @@ public class BatchRecords {
region.putAll(updateMap);
region.removeAll(removeList);
} else {
- logger.info("Unable to locate proxy region is null");
+ logger.info("Unable to locate a proxy region. Value is null");
}
}
}
diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
index 1f50ea4..873d89b 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionExistsException;
+import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.kafka.GeodeContext;
import org.apache.geode.kafka.Version;
@@ -61,13 +62,20 @@ public class GeodeKafkaSinkTask extends SinkTask {
GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props);
configure(geodeConnectorConfig);
geodeContext = new GeodeContext();
- geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
- geodeConnectorConfig.getSecurityClientAuthInit(),
- geodeConnectorConfig.getSecurityUserName(),
- geodeConnectorConfig.getSecurityPassword(),
- geodeConnectorConfig.usesSecurity());
+ ClientCache clientCache =
+ geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
+ geodeConnectorConfig.getSecurityClientAuthInit(),
+ geodeConnectorConfig.getSecurityUserName(),
+ geodeConnectorConfig.getSecurityPassword(),
+ geodeConnectorConfig.usesSecurity());
+ if (clientCache == null) {
+ throw new ConnectException("Unable start client cache in the sink task");
+ }
regionNameToRegion = createProxyRegions(topicToRegions.values());
} catch (Exception e) {
+ if (e instanceof ConnectException) {
+ throw e;
+ }
throw new ConnectException("Unable to start sink task", e);
}
}
diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
index 2ace395..7a60513 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
@@ -79,11 +79,7 @@ public class GeodeSinkConnectorConfig extends GeodeConnectorConfig {
}
public boolean getNullValueBehavior() {
- if (nullValueBehavior.equals(GeodeSinkConfigurationConstants.NullValueBehavior.REMOVE)) {
- return true;
- } else {
- return false;
- }
+ return nullValueBehavior.equals(GeodeSinkConfigurationConstants.NullValueBehavior.REMOVE);
}
}
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
index 13e5b60..4ce7371 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -29,6 +29,7 @@ import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqQuery;
@@ -72,14 +73,17 @@ public class GeodeKafkaSourceTask extends SourceTask {
GeodeSourceConnectorConfig geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
geodeContext = new GeodeContext();
- geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
- geodeConnectorConfig.getDurableClientId(),
- geodeConnectorConfig.getDurableClientTimeout(),
- geodeConnectorConfig.getSecurityClientAuthInit(),
- geodeConnectorConfig.getSecurityUserName(),
- geodeConnectorConfig.getSecurityPassword(),
- geodeConnectorConfig.usesSecurity());
-
+ ClientCache clientCache =
+ geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
+ geodeConnectorConfig.getDurableClientId(),
+ geodeConnectorConfig.getDurableClientTimeout(),
+ geodeConnectorConfig.getSecurityClientAuthInit(),
+ geodeConnectorConfig.getSecurityUserName(),
+ geodeConnectorConfig.getSecurityPassword(),
+ geodeConnectorConfig.usesSecurity());
+ if (clientCache == null) {
+ throw new ConnectException("Unable to create client cache in the source task");
+ }
batchSize = geodeConnectorConfig.getBatchSize();
eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());
@@ -92,9 +96,12 @@ public class GeodeKafkaSourceTask extends SourceTask {
loadEntireRegion);
logger.info("Started Apache Geode source task");
} catch (Exception e) {
- e.printStackTrace();
logger.error("Unable to start source task", e);
- throw e;
+ if (e instanceof ConnectException) {
+ throw e;
+ } else {
+ throw new ConnectException(e);
+ }
}
}
diff --git a/src/main/java/org/apache/geode/kafka/utils/EnumValidator.java b/src/main/java/org/apache/geode/kafka/utils/EnumValidator.java
index 1c9e1ff..f7c2a61 100644
--- a/src/main/java/org/apache/geode/kafka/utils/EnumValidator.java
+++ b/src/main/java/org/apache/geode/kafka/utils/EnumValidator.java
@@ -28,7 +28,7 @@ public class EnumValidator implements ConfigDef.Validator {
}
public static <T> EnumValidator in(T[] enumerators) {
- Set<String> validValues = new HashSet<String>(enumerators.length);
+ Set<String> validValues = new HashSet<>(enumerators.length);
for (T e : enumerators) {
validValues.add(e.toString().toLowerCase());
}
diff --git a/src/main/resources/kafka-connect-geode-version.properties b/src/main/resources/kafka-connect-geode-version.properties
index 5ea51b3..5efe2ea 100644
--- a/src/main/resources/kafka-connect-geode-version.properties
+++ b/src/main/resources/kafka-connect-geode-version.properties
@@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-version=${project.version}
\ No newline at end of file
+version=${project.version}
\ No newline at end of file
diff --git a/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java b/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java
index 46470c1..dfb990b 100644
--- a/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java
+++ b/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java
@@ -57,16 +57,16 @@ import org.apache.geode.test.dunit.rules.MemberVM;
public class JsonPdxConverterDUnitTest {
@Rule
- public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
+ final public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
@Rule
public TestName testName = new TestName();
@ClassRule
- public static TemporaryFolder temporaryFolderForZooKeeper = new TemporaryFolder();
+ final public static TemporaryFolder temporaryFolderForZooKeeper = new TemporaryFolder();
@Rule
- public TemporaryFolder temporaryFolderForOffset = new TemporaryFolder();
+ final public TemporaryFolder temporaryFolderForOffset = new TemporaryFolder();
@BeforeClass
public static void setup()
diff --git a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index 411316c..92c78e2 100644
--- a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -71,11 +71,6 @@ public class GeodeKafkaSourceTaskTest {
public void whenNotLoadingEntireRegionShouldNotPutInitialResultsIntoEventBuffer() {
GeodeContext geodeContext = mock(GeodeContext.class);
BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue<>(100);
- CqResults<Object> fakeInitialResults = new ResultsBag();
- for (int i = 0; i < 10; i++) {
- fakeInitialResults.add(mock(CqEvent.class));
- }
-
when(geodeContext.newCq(anyString(), anyString(), any(), anyBoolean()))
.thenReturn(mock(CqQuery.class));
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();