You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/05/21 20:39:38 UTC

spark git commit: [SPARK-7787] [STREAMING] Fix serialization issue of SerializableAWSCredentials

Repository: spark
Updated Branches:
  refs/heads/master 8730fbb47 -> 4b7ff3092


[SPARK-7787] [STREAMING] Fix serialization issue of SerializableAWSCredentials

Lack of default constructor causes deserialization to fail. This occurs only when the AWS credentials are explicitly specified through KinesisUtils.

Author: Tathagata Das <ta...@gmail.com>

Closes #6316 from tdas/SPARK-7787 and squashes the following commits:

248ca5c [Tathagata Das] Fixed serializability


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b7ff309
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b7ff309
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b7ff309

Branch: refs/heads/master
Commit: 4b7ff3092c53827817079e0810563cbb0b9d0747
Parents: 8730fbb
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu May 21 11:39:32 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu May 21 11:39:32 2015 -0700

----------------------------------------------------------------------
 .../streaming/kinesis/KinesisReceiver.scala     |  5 +++-
 .../kinesis/KinesisReceiverSuite.scala          | 30 +++++++++-----------
 2 files changed, 17 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4b7ff309/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 9016449..800202e 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -31,7 +31,10 @@ import org.apache.spark.util.Utils
 
 private[kinesis]
 case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
-  extends BasicAWSCredentials(accessKeyId, secretKey) with Serializable
+  extends AWSCredentials {
+  override def getAWSAccessKeyId: String = accessKeyId
+  override def getAWSSecretKey: String = secretKey
+}
 
 /**
  * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.

http://git-wip-us.apache.org/repos/asf/spark/blob/4b7ff309/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index 7c17ee9..cd19c33 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -20,27 +20,18 @@ import java.nio.ByteBuffer
 
 import scala.collection.JavaConversions.seqAsJavaList
 
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.Milliseconds
-import org.apache.spark.streaming.Seconds
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.TestSuiteBase
-import org.apache.spark.util.{ManualClock, Clock}
-
-import org.mockito.Mockito._
-import org.scalatest.BeforeAndAfter
-import org.scalatest.Matchers
-import org.scalatest.mock.MockitoSugar
-
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
 import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
 import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
 import com.amazonaws.services.kinesis.model.Record
-import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfter, Matchers}
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext, TestSuiteBase}
+import org.apache.spark.util.{Clock, ManualClock, Utils}
 
 /**
  * Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor
@@ -99,6 +90,11 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
     ssc.stop()
   }
 
+  test("check serializability of SerializableAWSCredentials") {
+    Utils.deserialize[SerializableAWSCredentials](
+      Utils.serialize(new SerializableAWSCredentials("x", "y")))
+  }
+
   test("process records including store and checkpoint") {
     when(receiverMock.isStopped()).thenReturn(false)
     when(checkpointStateMock.shouldCheckpoint()).thenReturn(true)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org