You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2018/12/21 06:25:53 UTC

[GitHub] zuoc closed pull request #23361: upgrade pom

zuoc closed pull request #23361: upgrade pom
URL: https://github.com/apache/spark/pull/23361
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/README.md b/README.md
index d861e9fee7055..c7892a68acd76 100644
--- a/README.md
+++ b/README.md
@@ -1,3 +1,26 @@
+# KySpark
+
+KySpark is a customized spark for better KAP use. 
+
+Naming convension of different branch is kyspark-{SPARK_VERSION}.x, e.g. kyspark-2.1.1.x
+
+Naming convension of different releases (the name could be used for creating git tag, for example), is kyspark-{SPARK_VERSION}-r{RELEASE_NUMBER}, e.g. kyspark-2.1.1-r1
+
+You can run:
+    
+    ./dev/make-distribution.sh --name kyspark-r1 --tgz -Phadoop-2.6 -Phive -Phive-thriftserver -Pyarn
+
+to build a new spark binary package named **spark-2.1.1-bin-kyspark-r1.tgz**
+
+If you want KAP to package your new kyspark by default, you should contact hongbin.ma@kyligence.io.
+
+steps:
+
+1. publish the new kyspark to kyligence.io server
+2. change md5 in kap's download-spark.sh, **Don't change kylin's download-spark.sh**
+3. change CI's using spark, including kap-release,kap-release-test and master-full-regression-test
+4. create a tag for the commit on which the new kyspark is built from. Don't forget to push tags to server
+
 # Apache Spark
 
 Spark is a fast and general cluster computing system for Big Data. It provides
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index dd8f5bacb9f6e..ea198aa9179d9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -122,7 +122,10 @@ private[storage] class BlockInfoManager extends Logging {
    * by [[removeBlock()]].
    */
   @GuardedBy("this")
-  private[this] val infos = new mutable.HashMap[BlockId, BlockInfo]
+  private[storage] val infos = new mutable.HashMap[BlockId, BlockInfo]
+  private[storage] val broadcaseInfosIndex = new mutable.HashMap[Long, List[BlockId]]
+  private[storage] val rddInfosIndex = new mutable.HashMap[Long, List[BlockId]]
+  private[storage] val shuffleInfosIndex = new mutable.HashMap[Long, List[BlockId]]
 
   /**
    * Tracks the set of blocks that each task has locked for writing.
@@ -324,6 +327,19 @@ private[storage] class BlockInfoManager extends Logging {
       case None =>
         // Block does not yet exist or is removed, so we are free to acquire the write lock
         infos(blockId) = newBlockInfo
+        blockId match {
+          case RDDBlockId(rddId, splitIndex) =>
+            val t = rddInfosIndex.getOrElse(rddId, Nil)
+            rddInfosIndex.put(rddId, blockId :: t)
+          case BroadcastBlockId(broadcastId, field) =>
+            val t = broadcaseInfosIndex.getOrElse(broadcastId, Nil)
+            broadcaseInfosIndex.put(broadcastId, blockId :: t)
+          case ShuffleBlockId(shuffleId, mapId, reduceId) =>
+            val t = shuffleInfosIndex.getOrElse(shuffleId, Nil)
+            shuffleInfosIndex.put(shuffleId, blockId :: t)
+          case _ =>
+        }
+
         lockForWriting(blockId)
         true
     }
@@ -412,6 +428,16 @@ private[storage] class BlockInfoManager extends Logging {
           throw new IllegalStateException(
             s"Task $currentTaskAttemptId called remove() on block $blockId without a write lock")
         } else {
+          blockId match {
+            case RDDBlockId(rddId, splitIndex) =>
+              rddInfosIndex.remove(rddId)
+            case BroadcastBlockId(broadcastId, field) =>
+              broadcaseInfosIndex.remove(broadcastId)
+            case ShuffleBlockId(shuffleId, mapId, reduceId) =>
+              shuffleInfosIndex.remove(shuffleId)
+            case _ =>
+          }
+
           infos.remove(blockId)
           blockInfo.readerCount = 0
           blockInfo.writerTask = BlockInfo.NO_WRITER
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 35551e41213a3..f47422c3c4488 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1345,7 +1345,12 @@ private[spark] class BlockManager(
   def removeRdd(rddId: Int): Int = {
     // TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks.
     logInfo(s"Removing RDD $rddId")
-    val blocksToRemove = blockInfoManager.entries.flatMap(_._1.asRDDId).filter(_.rddId == rddId)
+
+//    val blocksToRemove = blockInfoManager.entries.flatMap(_._1.asRDDId).filter(_.rddId == rddId)
+//    blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) }
+//    blocksToRemove.size
+
+    val blocksToRemove = blockInfoManager.rddInfosIndex.getOrElse(rddId, Nil)
     blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) }
     blocksToRemove.size
   }
@@ -1355,9 +1360,15 @@ private[spark] class BlockManager(
    */
   def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = {
     logDebug(s"Removing broadcast $broadcastId")
-    val blocksToRemove = blockInfoManager.entries.map(_._1).collect {
-      case bid @ BroadcastBlockId(`broadcastId`, _) => bid
-    }
+
+
+//    val blocksToRemove = blockInfoManager.entries.map(_._1).collect {
+//      case bid @ BroadcastBlockId(`broadcastId`, _) => bid
+//    }
+//    blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) }
+//    blocksToRemove.size
+
+    val blocksToRemove = blockInfoManager.broadcaseInfosIndex.getOrElse(broadcastId, Nil)
     blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) }
     blocksToRemove.size
   }
diff --git a/pom.xml b/pom.xml
index a985cf011de4b..914e4143ba461 100644
--- a/pom.xml
+++ b/pom.xml
@@ -156,7 +156,7 @@
     <commons.math3.version>3.4.1</commons.math3.version>
     <!-- managed up from 3.2.1 for SPARK-11652 -->
     <commons.collections.version>3.2.2</commons.collections.version>
-    <scala.version>2.11.8</scala.version>
+    <scala.version>2.11.12</scala.version>
     <scala.binary.version>2.11</scala.binary.version>
     <codehaus.jackson.version>1.9.13</codehaus.jackson.version>
     <fasterxml.jackson.version>2.6.5</fasterxml.jackson.version>
@@ -175,7 +175,7 @@
     <joda.version>2.9.3</joda.version>
     <jodd.version>3.5.2</jodd.version>
     <jsr305.version>1.3.9</jsr305.version>
-    <libthrift.version>0.9.3</libthrift.version>
+    <libthrift.version>0.10.0</libthrift.version>
     <antlr4.version>4.5.3</antlr4.version>
     <jpam.version>1.1</jpam.version>
     <selenium.version>2.52.0</selenium.version>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1ba736b221db8..bde88b42a860a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -419,8 +419,10 @@ private[spark] class Client(
       // slightly later then renewal time (80% of next renewal time). This is to make sure
       // credentials are renewed and updated before expired.
       val currTime = System.currentTimeMillis()
-      val renewalTime = (nearestTimeOfNextRenewal - currTime) * 0.75 + currTime
-      val updateTime = (nearestTimeOfNextRenewal - currTime) * 0.8 + currTime
+      val renewalTime = (nearestTimeOfNextRenewal - currTime) *
+        sparkConf.get(CREDENTIAL_RENEW_TIME_COEFFICIENT) + currTime
+      val updateTime = (nearestTimeOfNextRenewal - currTime) *
+        sparkConf.get(CREDENTIAL_UPDATE_COEFFICIENT) + currTime
 
       sparkConf.set(CREDENTIALS_RENEWAL_TIME, renewalTime.toLong)
       sparkConf.set(CREDENTIALS_UPDATE_TIME, updateTime.toLong)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index ca8c89043aa88..6dd89792a5ece 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -225,6 +225,21 @@ package object config {
       .createOptional
 
   /* Security configuration. */
+  private[spark] val CREDENTIAL_DRIVER_SKIP_UPDATE =
+    ConfigBuilder("spark.yarn.credentials.driver.skipUpdate")
+      .booleanConf
+      .createWithDefault(true)
+
+
+  private[spark] val CREDENTIAL_UPDATE_COEFFICIENT =
+    ConfigBuilder("spark.yarn.credentials.update.coefficient")
+      .doubleConf
+      .createWithDefault(0.6)
+
+  private[spark] val CREDENTIAL_RENEW_TIME_COEFFICIENT =
+    ConfigBuilder("spark.yarn.credentials.renewTime.coefficient")
+      .doubleConf
+      .createWithDefault(0.5)
 
   private[spark] val CREDENTIAL_FILE_MAX_COUNT =
     ConfigBuilder("spark.yarn.credentials.file.retention.count")
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
index 7e76f402db249..b45e96d68df19 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
@@ -67,6 +67,8 @@ private[yarn] class AMCredentialRenewer(
   private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
   private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
   private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
+  private val credentialUpdateCoefficient = sparkConf.get(CREDENTIAL_UPDATE_COEFFICIENT)
+  private val credentialRenewTimeCoefficient = sparkConf.get(CREDENTIAL_RENEW_TIME_COEFFICIENT)
   private val freshHadoopConf =
     hadoopUtil.getConfBypassingFSCache(hadoopConf, new Path(credentialsFile).toUri.getScheme)
 
@@ -193,8 +195,9 @@ private[yarn] class AMCredentialRenewer(
     } else {
       // Next valid renewal time is about 75% of credential renewal time, and update time is
       // slightly later than valid renewal time (80% of renewal time).
-      timeOfNextRenewal = ((nearestNextRenewalTime - currTime) * 0.75 + currTime).toLong
-      ((nearestNextRenewalTime - currTime) * 0.8 + currTime).toLong
+      timeOfNextRenewal = ((nearestNextRenewalTime - currTime) *
+        credentialRenewTimeCoefficient + currTime).toLong
+      ((nearestNextRenewalTime - currTime) * credentialUpdateCoefficient + currTime).toLong
     }
 
     // Add the temp credentials back to the original ones.
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
index 41b7b5d60b038..501efc6fa7af5 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
@@ -19,17 +19,18 @@ package org.apache.spark.deploy.yarn.security
 
 import java.util.concurrent.{Executors, TimeUnit}
 
-import scala.util.control.NonFatal
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-
-import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
 
 private[spark] class CredentialUpdater(
     sparkConf: SparkConf,
@@ -39,6 +40,7 @@ private[spark] class CredentialUpdater(
   @volatile private var lastCredentialsFileSuffix = 0
 
   private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
+  private val skipUpdate = sparkConf.get(CREDENTIAL_DRIVER_SKIP_UPDATE)
   private val freshHadoopConf =
     SparkHadoopUtil.get.getConfBypassingFSCache(
       hadoopConf, new Path(credentialsFile).toUri.getScheme)
@@ -66,6 +68,10 @@ private[spark] class CredentialUpdater(
   }
 
   private def updateCredentialsIfRequired(): Unit = {
+    if (skipUpdate && (SparkEnv.get.executorId == SparkContext.DRIVER_IDENTIFIER)) {
+      logInfo("Skip update token with driver.")
+      return
+    }
     val timeToNextUpdate = try {
       val credentialsFilePath = new Path(credentialsFile)
       val remoteFs = FileSystem.get(freshHadoopConf)
@@ -78,7 +84,18 @@ private[spark] class CredentialUpdater(
             logInfo("Reading new credentials from " + credentialsStatus.getPath)
             val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath)
             lastCredentialsFileSuffix = suffix
+            newCredentials.getAllTokens.asScala
+              .map(_.decodeIdentifier())
+              .filter(_.isInstanceOf[DelegationTokenIdentifier])
+              .map(_.toString)
+              .foreach(logInfo(_))
             UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
+            val tokens = UserGroupInformation.getCurrentUser.getCredentials.getAllTokens
+            tokens.asScala
+              .map(_.decodeIdentifier())
+              .filter(_.isInstanceOf[DelegationTokenIdentifier])
+              .map(_.toString)
+              .foreach(logInfo(_))
             logInfo("Credentials updated from credentials file.")
 
             val remainingTime = (getTimeOfNextUpdateFromFileName(credentialsStatus.getPath)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org