You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/29 08:30:50 UTC
carbondata git commit: [CARBONDATA-1833] Fix BindException in
TestStreamingTableOperation
Repository: carbondata
Updated Branches:
refs/heads/master 33de599a5 -> 5af6d27ab
[CARBONDATA-1833] Fix BindException in TestStreamingTableOperation
Test case TestStreamingTableOperation throwing BindException: Address already in use.
This PR will fix it.
This closes #1586
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5af6d27a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5af6d27a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5af6d27a
Branch: refs/heads/master
Commit: 5af6d27aba9a24241fcd54b2816babb228fa96e0
Parents: 33de599
Author: QiangCai <qi...@qq.com>
Authored: Wed Nov 29 14:29:00 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Nov 29 16:30:38 2017 +0800
----------------------------------------------------------------------
.../TestStreamingTableOperation.scala | 50 +++++++++++++++-----
1 file changed, 39 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5af6d27a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index cb641e1..a5268fb 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -18,8 +18,7 @@
package org.apache.spark.carbondata
import java.io.{File, PrintWriter}
-import java.net.ServerSocket
-import java.util.{Calendar, Date}
+import java.net.{BindException, ServerSocket}
import java.util.concurrent.Executors
import scala.collection.mutable
@@ -35,7 +34,6 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
-import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
@@ -204,12 +202,12 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
var server: ServerSocket = null
try {
- server = new ServerSocket(7071)
+ server = getServerSocket
val thread1 = createWriteSocketThread(server, 2, 10, 1)
thread1.start()
// use thread pool to catch the exception of sink thread
val pool = Executors.newSingleThreadExecutor()
- val thread2 = createSocketStreamingThread(spark, tablePath, identifier)
+ val thread2 = createSocketStreamingThread(spark, server.getLocalPort, tablePath, identifier)
val future = pool.submit(thread2)
Thread.sleep(1000)
thread1.interrupt()
@@ -637,7 +635,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
)
val beforeDelete = sql("show segments for table streaming.stream_table_delete").collect()
- sql(s"delete from table streaming.stream_table_delete where segment.starttime before '2999-10-01 01:00:00'")
+ sql(s"delete from table streaming.stream_table_delete where segment.starttime before " +
+ s"'2999-10-01 01:00:00'")
val rows = sql("show segments for table streaming.stream_table_delete").collect()
assertResult(beforeDelete.length)(rows.length)
@@ -664,7 +663,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
try {
sql("ALTER TABLE stream_table_alter SET TBLPROPERTIES('streaming'='false')")
- assert(false, "unsupport disable streaming properties")
+ assert(false, "unsupport disable streaming properties")
} catch {
case _ =>
assert(true)
@@ -756,6 +755,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
def createSocketStreamingThread(
spark: SparkSession,
+ port: Int,
tablePath: CarbonTablePath,
tableIdentifier: TableIdentifier,
badRecordAction: String = "force",
@@ -768,7 +768,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
val readSocketDF = spark.readStream
.format("socket")
.option("host", "localhost")
- .option("port", 7071)
+ .option("port", port)
.load()
// Write data from socket stream to carbondata file
@@ -815,8 +815,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
var server: ServerSocket = null
try {
- server = new ServerSocket(7071)
-
+ server = getServerSocket()
val thread1 = createWriteSocketThread(
serverSocket = server,
writeNums = batchNums,
@@ -825,7 +824,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
badRecords = generateBadRecords)
val thread2 = createSocketStreamingThread(
spark = spark,
- tablePath = tablePath, identifier,
+ port = server.getLocalPort,
+ tablePath = tablePath,
+ tableIdentifier = identifier,
badRecordAction = badRecordAction,
intervalSecond = intervalOfIngest,
handoffSize = handoffSize)
@@ -966,4 +967,31 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
def wrap(array: Array[String]) = {
new mutable.WrappedArray.ofRef(array)
}
+
+ /**
+ * get a ServerSocket
+ * if the address was already used, it will retry to use new port number.
+ *
+ * @return ServerSocket
+ */
+ def getServerSocket(): ServerSocket = {
+ var port = 7071
+ var serverSocket: ServerSocket = null
+ var retry = false
+ do {
+ try {
+ retry = false
+ serverSocket = new ServerSocket(port)
+ } catch {
+ case ex: BindException =>
+ retry = true
+ port = port + 2
+ if (port >= 65535) {
+ throw ex
+ }
+ }
+ } while (retry)
+ serverSocket
+ }
+
}