You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/29 08:30:50 UTC

carbondata git commit: [CARBONDATA-1833] Fix BindException in TestStreamingTableOperation

Repository: carbondata
Updated Branches:
  refs/heads/master 33de599a5 -> 5af6d27ab


[CARBONDATA-1833] Fix BindException in TestStreamingTableOperation

Test case TestStreamingTableOperation throwing BindException: Address already in use.
This PR will fix it.

This closes #1586


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5af6d27a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5af6d27a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5af6d27a

Branch: refs/heads/master
Commit: 5af6d27aba9a24241fcd54b2816babb228fa96e0
Parents: 33de599
Author: QiangCai <qi...@qq.com>
Authored: Wed Nov 29 14:29:00 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Nov 29 16:30:38 2017 +0800

----------------------------------------------------------------------
 .../TestStreamingTableOperation.scala           | 50 +++++++++++++++-----
 1 file changed, 39 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5af6d27a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index cb641e1..a5268fb 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -18,8 +18,7 @@
 package org.apache.spark.carbondata
 
 import java.io.{File, PrintWriter}
-import java.net.ServerSocket
-import java.util.{Calendar, Date}
+import java.net.{BindException, ServerSocket}
 import java.util.concurrent.Executors
 
 import scala.collection.mutable
@@ -35,7 +34,6 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
-import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
@@ -204,12 +202,12 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     var server: ServerSocket = null
     try {
-      server = new ServerSocket(7071)
+      server = getServerSocket
       val thread1 = createWriteSocketThread(server, 2, 10, 1)
       thread1.start()
       // use thread pool to catch the exception of sink thread
       val pool = Executors.newSingleThreadExecutor()
-      val thread2 = createSocketStreamingThread(spark, tablePath, identifier)
+      val thread2 = createSocketStreamingThread(spark, server.getLocalPort, tablePath, identifier)
       val future = pool.submit(thread2)
       Thread.sleep(1000)
       thread1.interrupt()
@@ -637,7 +635,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     )
     val beforeDelete = sql("show segments for table streaming.stream_table_delete").collect()
 
-    sql(s"delete from table streaming.stream_table_delete where segment.starttime before '2999-10-01 01:00:00'")
+    sql(s"delete from table streaming.stream_table_delete where segment.starttime before " +
+        s"'2999-10-01 01:00:00'")
 
     val rows = sql("show segments for table streaming.stream_table_delete").collect()
     assertResult(beforeDelete.length)(rows.length)
@@ -664,7 +663,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
 
     try {
       sql("ALTER TABLE stream_table_alter SET TBLPROPERTIES('streaming'='false')")
-      assert(false,  "unsupport disable streaming properties")
+      assert(false, "unsupport disable streaming properties")
     } catch {
       case _ =>
         assert(true)
@@ -756,6 +755,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
 
   def createSocketStreamingThread(
       spark: SparkSession,
+      port: Int,
       tablePath: CarbonTablePath,
       tableIdentifier: TableIdentifier,
       badRecordAction: String = "force",
@@ -768,7 +768,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
           val readSocketDF = spark.readStream
             .format("socket")
             .option("host", "localhost")
-            .option("port", 7071)
+            .option("port", port)
             .load()
 
           // Write data from socket stream to carbondata file
@@ -815,8 +815,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     var server: ServerSocket = null
     try {
-      server = new ServerSocket(7071)
-
+      server = getServerSocket()
       val thread1 = createWriteSocketThread(
         serverSocket = server,
         writeNums = batchNums,
@@ -825,7 +824,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
         badRecords = generateBadRecords)
       val thread2 = createSocketStreamingThread(
         spark = spark,
-        tablePath = tablePath, identifier,
+        port = server.getLocalPort,
+        tablePath = tablePath,
+        tableIdentifier = identifier,
         badRecordAction = badRecordAction,
         intervalSecond = intervalOfIngest,
         handoffSize = handoffSize)
@@ -966,4 +967,31 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
   def wrap(array: Array[String]) = {
     new mutable.WrappedArray.ofRef(array)
   }
+
+  /**
+   * get a ServerSocket
+   * if the address was already used, it will retry to use new port number.
+   *
+   * @return ServerSocket
+   */
+  def getServerSocket(): ServerSocket = {
+    var port = 7071
+    var serverSocket: ServerSocket = null
+    var retry = false
+    do {
+      try {
+        retry = false
+        serverSocket = new ServerSocket(port)
+      } catch {
+        case ex: BindException =>
+          retry = true
+          port = port + 2
+          if (port >= 65535) {
+            throw ex
+          }
+      }
+    } while (retry)
+    serverSocket
+  }
+
 }