You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/25 04:01:04 UTC

[pulsar] 01/07: Load credentials from secrets for Kinesis connectors (#10822)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3f000b8f8d7e35c62897e3f78c3fe5769e9b8f5d
Author: Yang Yang <yy...@streamnative.io>
AuthorDate: Tue Jun 8 20:01:30 2021 +0800

    Load credentials from secrets for Kinesis connectors (#10822)
    
    ### Motivation
    
    Support loading `awsCredentialPluginParam` from secrets for kinesis connectors to avoid leaking of sensitive information with best efforts.
    
    ### Modifications
    
    Load kinesis configs with `IOConfigUtils.loadWithSecrets()`.
    
    ### Verifying this change
    
    This change is already covered by existing tests and this PR also adds tests to confirm both loading credentials from plaintext config object and secrets work as expected.
    
    
    (cherry picked from commit cb79032b84228207122bd666b0553cc82e442a4e)
---
 pulsar-io/kinesis/pom.xml                          |  5 ++
 .../pulsar/io/kinesis/BaseKinesisConfig.java       |  1 +
 .../org/apache/pulsar/io/kinesis/KinesisSink.java  | 11 ++--
 .../pulsar/io/kinesis/KinesisSinkConfig.java       | 21 +++----
 .../apache/pulsar/io/kinesis/KinesisSource.java    | 19 +++---
 .../pulsar/io/kinesis/KinesisSourceConfig.java     |  5 --
 .../pulsar/io/kinesis/KinesisSinkConfigTests.java  | 36 +++++++++--
 .../io/kinesis/KinesisSourceConfigTests.java       | 73 +++++++++++++++++-----
 8 files changed, 118 insertions(+), 53 deletions(-)

diff --git a/pulsar-io/kinesis/pom.xml b/pulsar-io/kinesis/pom.xml
index cd1ad28..0271ea9 100644
--- a/pulsar-io/kinesis/pom.xml
+++ b/pulsar-io/kinesis/pom.xml
@@ -31,6 +31,11 @@
   <name>Pulsar IO :: Kinesis</name>
 
   <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
 
     <dependency>
       <groupId>${project.groupId}</groupId>
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/BaseKinesisConfig.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/BaseKinesisConfig.java
index cad2ac5..92da3e6 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/BaseKinesisConfig.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/BaseKinesisConfig.java
@@ -64,6 +64,7 @@ public abstract class BaseKinesisConfig implements Serializable {
     @FieldDoc(
         required = false,
         defaultValue = "",
+        sensitive = true,
         help = "json-parameters to initialize `AwsCredentialsProviderPlugin`")
     private String awsCredentialPluginParam = "";
 
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index ed7c70c..c6fe369 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -50,6 +50,7 @@ import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.aws.AbstractAwsConnector;
 import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
+import org.apache.pulsar.io.common.IOConfigUtils;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
@@ -100,7 +101,7 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
     private static final int maxPartitionedKeyLength = 256;
     private SinkContext sinkContext;
     private ScheduledExecutorService scheduledExecutor;
-    // 
+    //
     private static final int FALSE = 0;
     private static final int TRUE = 1;
     private volatile int previousPublishFailed = FALSE;
@@ -153,12 +154,12 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
         scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
-        kinesisSinkConfig = KinesisSinkConfig.load(config);
+        kinesisSinkConfig = IOConfigUtils.loadWithSecrets(config, KinesisSinkConfig.class, sinkContext);
         this.sinkContext = sinkContext;
 
         checkArgument(isNotBlank(kinesisSinkConfig.getAwsKinesisStreamName()), "empty kinesis-stream name");
-        checkArgument(isNotBlank(kinesisSinkConfig.getAwsEndpoint()) || 
-                      isNotBlank(kinesisSinkConfig.getAwsRegion()), 
+        checkArgument(isNotBlank(kinesisSinkConfig.getAwsEndpoint()) ||
+                      isNotBlank(kinesisSinkConfig.getAwsRegion()),
                       "Either the aws-end-point or aws-region must be set");
         checkArgument(isNotBlank(kinesisSinkConfig.getAwsCredentialPluginParam()), "empty aws-credential param");
 
@@ -287,4 +288,4 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
         }
     }
 
-}
\ No newline at end of file
+}
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index fa00550..61b0c76 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -34,7 +34,7 @@ import org.apache.pulsar.io.core.annotations.FieldDoc;
 public class KinesisSinkConfig extends BaseKinesisConfig implements Serializable {
 
     private static final long serialVersionUID = 1L;
-    
+
     @FieldDoc(
         required = true,
         defaultValue = "ONLY_RAW_PAYLOAD",
@@ -57,7 +57,7 @@ public class KinesisSinkConfig extends BaseKinesisConfig implements Serializable
             + "  #   properties and encryptionCtx, and publishes flatbuffer payload into the configured kinesis stream."
     )
     private MessageFormat messageFormat = MessageFormat.ONLY_RAW_PAYLOAD; // default : ONLY_RAW_PAYLOAD
-    
+
     @FieldDoc(
         required = false,
         defaultValue = "false",
@@ -81,11 +81,6 @@ public class KinesisSinkConfig extends BaseKinesisConfig implements Serializable
         return mapper.readValue(new File(yamlFile), KinesisSinkConfig.class);
     }
 
-    public static KinesisSinkConfig load(Map<String, Object> map) throws IOException {
-        ObjectMapper mapper = new ObjectMapper();
-        return mapper.readValue(new ObjectMapper().writeValueAsString(map), KinesisSinkConfig.class);
-    }
-    
     public static enum MessageFormat {
         /**
          * Kinesis sink directly publishes pulsar-payload as a message into the kinesis-stream
@@ -94,13 +89,13 @@ public class KinesisSinkConfig extends BaseKinesisConfig implements Serializable
         /**
          * Kinesis sink creates a json payload with message-payload, properties and encryptionCtx and publishes json
          * payload to kinesis stream.
-         * 
-         * schema: 
+         *
+         * schema:
          * {"type":"object","properties":{"encryptionCtx":{"type":"object","properties":{"metadata":{"type":"object","additionalProperties":{"type":"string"}},"uncompressedMessageSize":{"type":"integer"},"keysMetadataMap":{"type":"object","additionalProperties":{"type":"object","additionalProperties":{"type":"string"}}},"keysMapBase64":{"type":"object","additionalProperties":{"type":"string"}},"encParamBase64":{"type":"string"},"compressionType":{"type":"string","enum":["NONE","LZ4","ZLI [...]
          * Example:
          * {"payloadBase64":"cGF5bG9hZA==","properties":{"prop1":"value"},"encryptionCtx":{"keysMapBase64":{"key1":"dGVzdDE=","key2":"dGVzdDI="},"keysMetadataMap":{"key1":{"ckms":"cmks-1","version":"v1"},"key2":{"ckms":"cmks-2","version":"v2"}},"metadata":{"ckms":"cmks-1","version":"v1"},"encParamBase64":"cGFyYW0=","algorithm":"algo","compressionType":"LZ4","uncompressedMessageSize":10,"batchSize":10}}
-         * 
-         * 
+         *
+         *
          */
         FULL_MESSAGE_IN_JSON,
         /**
@@ -108,5 +103,5 @@ public class KinesisSinkConfig extends BaseKinesisConfig implements Serializable
          */
         FULL_MESSAGE_IN_FB;
     }
-    
-}
\ No newline at end of file
+
+}
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java
index 598f3af..655c2f7 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java
@@ -29,6 +29,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.io.aws.AbstractAwsConnector;
 import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
+import org.apache.pulsar.io.common.IOConfigUtils;
 import org.apache.pulsar.io.core.Source;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.core.annotations.Connector;
@@ -43,7 +44,7 @@ import software.amazon.kinesis.retrieval.RetrievalConfig;
 import software.amazon.kinesis.retrieval.polling.PollingConfig;
 
 /**
- * 
+ *
  * @see ConfigsBuilder
  */
 @Connector(
@@ -72,23 +73,23 @@ public class KinesisSource extends AbstractAwsConnector implements Source<byte[]
 
     @Override
     public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
-        this.kinesisSourceConfig = KinesisSourceConfig.load(config);
-        
+        this.kinesisSourceConfig = IOConfigUtils.loadWithSecrets(config, KinesisSourceConfig.class, sourceContext);
+
         checkArgument(isNotBlank(kinesisSourceConfig.getAwsKinesisStreamName()), "empty kinesis-stream name");
-        checkArgument(isNotBlank(kinesisSourceConfig.getAwsEndpoint()) || 
-                      isNotBlank(kinesisSourceConfig.getAwsRegion()), 
+        checkArgument(isNotBlank(kinesisSourceConfig.getAwsEndpoint()) ||
+                      isNotBlank(kinesisSourceConfig.getAwsRegion()),
                      "Either the aws-end-point or aws-region must be set");
         checkArgument(isNotBlank(kinesisSourceConfig.getAwsCredentialPluginParam()), "empty aws-credential param");
-        
+
         if (kinesisSourceConfig.getInitialPositionInStream() == InitialPositionInStream.AT_TIMESTAMP) {
             checkArgument((kinesisSourceConfig.getStartAtTime() != null),"Timestamp must be specified");
         }
-        
+
         queue = new LinkedBlockingQueue<KinesisRecord> (kinesisSourceConfig.getReceiveQueueSize());
         workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
-        
+
         AwsCredentialProviderPlugin credentialsProvider = createCredentialProvider(
-                kinesisSourceConfig.getAwsCredentialPluginName(), 
+                kinesisSourceConfig.getAwsCredentialPluginName(),
                 kinesisSourceConfig.getAwsCredentialPluginParam());
 
         KinesisAsyncClient kClient = kinesisSourceConfig.buildKinesisAsyncClient(credentialsProvider);
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java
index d75b521..cd9e313 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java
@@ -146,11 +146,6 @@ public class KinesisSourceConfig extends BaseKinesisConfig implements Serializab
         return mapper.readValue(new File(yamlFile), KinesisSourceConfig.class);
     }
 
-    public static KinesisSourceConfig load(Map<String, Object> map) throws IOException {
-        ObjectMapper mapper = new ObjectMapper();
-        return mapper.readValue(new ObjectMapper().writeValueAsString(map), KinesisSourceConfig.class);
-    }
-
     public KinesisAsyncClient buildKinesisAsyncClient(AwsCredentialProviderPlugin credPlugin) {
         KinesisAsyncClientBuilder builder = KinesisAsyncClient.builder();
 
diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTests.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTests.java
index 642bd9f..dd799e0 100644
--- a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTests.java
+++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTests.java
@@ -26,7 +26,10 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 public class KinesisSinkConfigTests {
@@ -35,7 +38,7 @@ public class KinesisSinkConfigTests {
     public final void loadFromYamlFileTest() throws IOException {
         File yamlFile = getFile("sinkConfig.yaml");
         KinesisSinkConfig config = KinesisSinkConfig.load(yamlFile.getAbsolutePath());
-        
+
         assertNotNull(config);
         assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
         assertEquals(config.getAwsRegion(), "us-east-1");
@@ -45,7 +48,7 @@ public class KinesisSinkConfigTests {
         assertEquals(config.getMessageFormat(), MessageFormat.ONLY_RAW_PAYLOAD);
         assertEquals(true, config.isRetainOrdering());
     }
-    
+
     @Test
     public final void loadFromMapTest() throws IOException {
         Map<String, Object> map = new HashMap<String, Object> ();
@@ -53,9 +56,30 @@ public class KinesisSinkConfigTests {
         map.put("awsRegion", "us-east-1");
         map.put("awsKinesisStreamName", "my-stream");
         map.put("awsCredentialPluginParam", "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
-        
-        KinesisSinkConfig config = KinesisSinkConfig.load(map);
-        
+
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        KinesisSinkConfig config = IOConfigUtils.loadWithSecrets(map, KinesisSinkConfig.class, sinkContext);
+
+        assertNotNull(config);
+        assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
+        assertEquals(config.getAwsRegion(), "us-east-1");
+        assertEquals(config.getAwsKinesisStreamName(), "my-stream");
+        assertEquals(config.getAwsCredentialPluginParam(),
+                "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
+    }
+
+    @Test
+    public final void loadFromMapCredentialFromSecretTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("awsEndpoint", "https://some.endpoint.aws");
+        map.put("awsRegion", "us-east-1");
+        map.put("awsKinesisStreamName", "my-stream");
+
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        Mockito.when(sinkContext.getSecret("awsCredentialPluginParam"))
+                .thenReturn("{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
+        KinesisSinkConfig config = IOConfigUtils.loadWithSecrets(map, KinesisSinkConfig.class, sinkContext);
+
         assertNotNull(config);
         assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
         assertEquals(config.getAwsRegion(), "us-east-1");
@@ -63,7 +87,7 @@ public class KinesisSinkConfigTests {
         assertEquals(config.getAwsCredentialPluginParam(),
                 "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
     }
-    
+
     private File getFile(String name) {
         ClassLoader classLoader = getClass().getClassLoader();
         return new File(classLoader.getResource(name).getFile());
diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java
index b79e126..4bca8e7 100644
--- a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java
+++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java
@@ -30,6 +30,9 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SourceContext;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 import software.amazon.kinesis.common.InitialPositionInStream;
 
@@ -37,7 +40,7 @@ import software.amazon.kinesis.common.InitialPositionInStream;
 public class KinesisSourceConfigTests {
 
     private static final Date DAY;
-    
+
     static {
         Calendar then = Calendar.getInstance();
         then.set(Calendar.YEAR, 2019);
@@ -67,14 +70,14 @@ public class KinesisSourceConfigTests {
         assertEquals(config.getNumRetries(), 3);
         assertEquals(config.getReceiveQueueSize(), 2000);
         assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON);
-        
+
         Calendar cal = Calendar.getInstance();
         cal.setTime(config.getStartAtTime());
         ZonedDateTime actual = ZonedDateTime.ofInstant(cal.toInstant(), ZoneOffset.UTC);
         ZonedDateTime expected = ZonedDateTime.ofInstant(DAY.toInstant(), ZoneOffset.UTC);
         assertEquals(actual, expected);
     }
-    
+
     @Test
     public final void loadFromMapTest() throws IOException {
         Map<String, Object> map = new HashMap<String, Object> ();
@@ -89,9 +92,10 @@ public class KinesisSourceConfigTests {
         map.put("applicationName", "My test application");
         map.put("initialPositionInStream", InitialPositionInStream.TRIM_HORIZON);
         map.put("startAtTime", DAY);
-        
-        KinesisSourceConfig config = KinesisSourceConfig.load(map);
-        
+
+        SourceContext sourceContext = Mockito.mock(SourceContext.class);
+        KinesisSourceConfig config = IOConfigUtils.loadWithSecrets(map, KinesisSourceConfig.class, sourceContext);
+
         assertNotNull(config);
         assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
         assertEquals(config.getAwsRegion(), "us-east-1");
@@ -104,41 +108,80 @@ public class KinesisSourceConfigTests {
         assertEquals(config.getNumRetries(), 3);
         assertEquals(config.getReceiveQueueSize(), 2000);
         assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON);
-        
+
         Calendar cal = Calendar.getInstance();
         cal.setTime(config.getStartAtTime());
         ZonedDateTime actual = ZonedDateTime.ofInstant(cal.toInstant(), ZoneOffset.UTC);
         ZonedDateTime expected = ZonedDateTime.ofInstant(DAY.toInstant(), ZoneOffset.UTC);
         assertEquals(actual, expected);
     }
-    
-    @Test(expectedExceptions = IllegalArgumentException.class, 
+
+    @Test
+    public final void loadFromMapCredentialFromSecretTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("awsEndpoint", "https://some.endpoint.aws");
+        map.put("awsRegion", "us-east-1");
+        map.put("awsKinesisStreamName", "my-stream");
+        map.put("checkpointInterval", "30000");
+        map.put("backoffTime", "4000");
+        map.put("numRetries", "3");
+        map.put("receiveQueueSize", 2000);
+        map.put("applicationName", "My test application");
+        map.put("initialPositionInStream", InitialPositionInStream.TRIM_HORIZON);
+        map.put("startAtTime", DAY);
+
+        SourceContext sourceContext = Mockito.mock(SourceContext.class);
+        Mockito.when(sourceContext.getSecret("awsCredentialPluginParam"))
+                .thenReturn("{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
+        KinesisSourceConfig config = IOConfigUtils.loadWithSecrets(map, KinesisSourceConfig.class, sourceContext);
+
+        assertNotNull(config);
+        assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
+        assertEquals(config.getAwsRegion(), "us-east-1");
+        assertEquals(config.getAwsKinesisStreamName(), "my-stream");
+        assertEquals(config.getAwsCredentialPluginParam(),
+                "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
+        assertEquals(config.getApplicationName(), "My test application");
+        assertEquals(config.getCheckpointInterval(), 30000);
+        assertEquals(config.getBackoffTime(), 4000);
+        assertEquals(config.getNumRetries(), 3);
+        assertEquals(config.getReceiveQueueSize(), 2000);
+        assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON);
+
+        Calendar cal = Calendar.getInstance();
+        cal.setTime(config.getStartAtTime());
+        ZonedDateTime actual = ZonedDateTime.ofInstant(cal.toInstant(), ZoneOffset.UTC);
+        ZonedDateTime expected = ZonedDateTime.ofInstant(DAY.toInstant(), ZoneOffset.UTC);
+        assertEquals(actual, expected);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
             expectedExceptionsMessageRegExp = "empty aws-credential param")
     public final void missingCredentialsTest() throws Exception {
         Map<String, Object> map = new HashMap<String, Object> ();
         map.put("awsEndpoint", "https://some.endpoint.aws");
         map.put("awsRegion", "us-east-1");
         map.put("awsKinesisStreamName", "my-stream");
-     
+
         KinesisSource source = new KinesisSource();
         source.open(map, null);
     }
-    
-    @Test(expectedExceptions = IllegalArgumentException.class, 
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
             expectedExceptionsMessageRegExp = "Timestamp must be specified")
     public final void missingStartTimeTest() throws Exception {
         Map<String, Object> map = new HashMap<String, Object> ();
         map.put("awsEndpoint", "https://some.endpoint.aws");
         map.put("awsRegion", "us-east-1");
         map.put("awsKinesisStreamName", "my-stream");
-        map.put("awsCredentialPluginParam", 
+        map.put("awsCredentialPluginParam",
                 "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
         map.put("initialPositionInStream", InitialPositionInStream.AT_TIMESTAMP);
-     
+
         KinesisSource source = new KinesisSource();
         source.open(map, null);
     }
-    
+
     private File getFile(String name) {
         ClassLoader classLoader = getClass().getClassLoader();
         return new File(classLoader.getResource(name).getFile());