You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2021/12/15 07:22:11 UTC

[GitHub] [hive] wangyum opened a new pull request #2878: HIVE-24893: Add UploadData and DownloadData to TCLIService.thrift

wangyum opened a new pull request #2878:
URL: https://github.com/apache/hive/pull/2878


   ### What changes were proposed in this pull request?
   
   Add `UploadData` and `DownloadData` to TCLIService.thrift.
   
   ### Why are the changes needed?
   
   It is very useful to support downloading large amounts of data (such as more than 50GB) through JDBC.
   
   Snowflake has similar support :
   https://docs.snowflake.com/en/user-guide/jdbc-using.html#label-jdbc-download-from-stage-to-stream
   https://github.com/snowflakedb/snowflake-jdbc/blob/95a7d8a03316093430dc3960df6635643208b6fd/src/main/java/net/snowflake/client/jdbc/SnowflakeConnectionV1.java#L886
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   // TODO


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] wangyum commented on a change in pull request #2878: HIVE-24893: Add UploadData and DownloadData to TCLIService.thrift

Posted by GitBox <gi...@apache.org>.
wangyum commented on a change in pull request #2878:
URL: https://github.com/apache/hive/pull/2878#discussion_r784488086



##########
File path: service-rpc/if/TCLIService.thrift
##########
@@ -751,6 +751,52 @@ struct TGetTypeInfoResp {
   2: optional TOperationHandle operationHandle
 }
 
+// UploadData()
+//
+// UploadData data to table/path.
+struct TUploadDataReq {
+  // The session to execute the statement against
+  1: required TSessionHandle sessionHandle
+
+  // The table to be stored
+  2: optional string tableName
+
+  // The path to be stored
+  3: optional string path
+
+  // The data to be transferred
+  4: required binary values
+}
+
+struct TUploadDataResp {
+  1: required TStatus status
+  2: required TOperationHandle operationHandle
+}
+
+// DownloadData()
+//
+// Download data to JDBC client.
+struct TDownloadDataReq {
+  // The session to download data
+  1: required TSessionHandle sessionHandle
+
+  // The download table name
+  2: optional TPatternOrIdentifier tableName
+
+  // The download query
+  3: optional string query
+
+  // The download file format
+  4: optional string format

Review comment:
       For example, we want to download data as csv format, we don't need to care about the table format:
   ```scala
   spark.table("tableName").write.format("csv").option("dateFormat", "yyyy-MM-dd").save("/tmp/spark/download/sessionid/queryid")
   ```
   and then download data from `"/tmp/spark/download/sessionid/queryid"`.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] wangyum commented on pull request #2878: HIVE-24893: Add UploadData and DownloadData to TCLIService.thrift

Posted by GitBox <gi...@apache.org>.
wangyum commented on pull request #2878:
URL: https://github.com/apache/hive/pull/2878#issuecomment-994669275


   This is an example how Spark implement `DownloadDataOperation`:
   ```scala
   private[hive] case class DownloadDataBlock(
       path: Option[Path] = None,
       offset: Option[Long] = None,
       schema: Option[String] = None,
       dataSize: Long)
   
   private[hive] class SparkDownloadDataOperation(
       val sqlContext: SQLContext,
       parentSession: HiveSession,
       tableName: String,
       query: String,
       format: String,
       options: JMap[String, String],
       runInBackground: Boolean)
     extends Operation(
       parentSession,
       Map.empty[String, String].asJava,
       OperationType.UNKNOWN_OPERATION,
       runInBackground) with SparkOperation with QueryLogging with Logging {
   
     private var result: DataFrame = _
   
     private lazy val resultSchema: TableSchema = {
       if (result == null || result.schema.isEmpty) {
         new TableSchema(Arrays.asList(new FieldSchema("Result", "string", "")))
       } else {
         logInfo(s"Result Schema: ${result.schema}")
         SparkExecuteStatementOperation.getTableSchema(result.schema)
       }
     }
   
     private val pathFilter = new PathFilter {
       override def accept(path: Path): Boolean =
         !path.getName.equals("_SUCCESS") && !path.getName.endsWith("crc")
     }
   
     private val defaultBlockSize = 10 * 1024 * 1024
   
     // Please see CSVOptions for more details.
     private val defaultOptions = Map(
       "timestampFormat" -> "yyyy-MM-dd HH:mm:ss",
       "dateFormat" -> "yyyy-MM-dd",
       "delimiter" -> ",",
       "escape" -> "\"",
       "compression" -> "gzip",
       "header" -> "true",
       "maxRecordsPerFile" ->"0",
       // To avoid Zeta client timeout
       "fetchBlockSize" -> defaultBlockSize.toString,
       "maxFetchBlockTime" -> "30000",
       // To avoid coalesce
       "minFileSize" -> (defaultBlockSize - 1 * 1024 * 1024).toString)
   
     private val writeOptions =
       defaultOptions ++ Option(options).map(_.asScala).getOrElse(Map.empty[String, String]).toMap
     private val numFiles = writeOptions.get("numFiles").map(_.toInt)
   
     private val fetchSize = writeOptions("fetchBlockSize").toLong
   
     private val maxFetchBlockTime = writeOptions("maxFetchBlockTime").toLong
   
     private val minFileSize = writeOptions("minFileSize").toLong
   
     private val downloadQuery = s"Generating download files with arguments " +
       s"[${tableName}, ${query}, ${format}, ${writeOptions}]"
   
     private val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
     private val scratchDir = sqlContext.conf.getConf(StaticSQLConf.SPARK_SCRATCH_DIR)
     private val pathPrefix = new Path(scratchDir + File.separator + "DownloadData" + File.separator +
       parentSession.getUserName + File.separator + parentSession.getSessionHandle.getSessionId)
     private val fs: FileSystem = pathPrefix.getFileSystem(hadoopConf)
   
     private var iter: JIterator[DownloadDataBlock] = _
     private var schemaStr: String = _
     private var totalDataSize: Long = 0
   
     override def close(): Unit = {
       // CARMEL-4662 Fix Download query state is incorrect.
       if (getStatus.getState eq OperationState.FINISHED) {
         HiveThriftServer2.eventManager.onStatementFinish(statementId)
       }
       HiveThriftServer2.eventManager.onQueryExist(
         statementId,
         QueryLogObjectList(Option(result).map(_.queryExecution)),
         QueryLogExtInfo(false, totalDataSize))
       logInfo(s"CLOSING $statementId")
       cleanup(OperationState.CLOSED)
       sqlContext.sparkContext.clearJobGroup()
     }
   
     override def runInternal(): Unit = {
       setState(OperationState.PENDING)
       setHasResultSet(true)
   
       if (!runInBackground) {
         execute()
       } else {
         val sparkServiceUGI = HiveShimsUtils.getUGI()
   
         val backgroundOperation = new Runnable() {
   
           override def run(): Unit = {
             val doAsAction = new PrivilegedExceptionAction[Unit]() {
               override def run(): Unit = {
                 try {
                   execute()
                 } catch {
                   case e: HiveSQLException =>
                     setOperationException(e)
                     log.error("Error generating download file: ", e)
                 }
               }
             }
   
             try {
               sparkServiceUGI.doAs(doAsAction)
             } catch {
               case e: Exception =>
                 setOperationException(new HiveSQLException(e))
                 logError("Error generating download file as user : " +
                   sparkServiceUGI.getShortUserName(), e)
             }
           }
         }
         try {
           // This submit blocks if no background threads are available to run this operation
           val backgroundHandle =
             parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
           setBackgroundHandle(backgroundHandle)
         } catch {
           case rejected: RejectedExecutionException =>
             setState(OperationState.ERROR)
             throw new HiveSQLException("The background threadpool cannot accept" +
               " new task for execution, please retry the operation", rejected)
           case NonFatal(e) =>
             logError("Error generating download file in background", e)
             setState(OperationState.ERROR)
             throw new HiveSQLException(e)
         }
       }
     }
   
     private def execute(): Unit = {
       statementId = getHandle.getHandleIdentifier.getPublicId.toString
       setState(OperationState.RUNNING)
       try {
         // Always use the latest class loader provided by executionHive's state.
         val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
         // Use parent session's SessionState in this operation because such SessionState
         // keeps some shared info per session e.g. authorization information.
         SessionState.setCurrentSessionState(parentSession.getSessionState)
         Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
   
         sqlContext.sparkContext.listenerBus.
           post(StatementStart(statementId, System.currentTimeMillis(),
             sqlContext.sparkContext.getLocalProperty("spark.hive.session.id")))
         HiveThriftServer2.eventManager.onStatementStart(
           statementId,
           parentSession.getSessionHandle.getSessionId.toString,
           downloadQuery,
           statementId,
           parentSession.getUsername)
   
         assert(fetchSize >= 1L * 1024 * 1024 && fetchSize <= 20L * 1024 * 1024,
           s"fetchBlockSize(${fetchSize}) should be greater than 1M and less than 20M.")
   
         if (StringUtils.isNotEmpty(tableName) && StringUtils.isNotEmpty(query)) {
           throw new HiveSQLException("Both table name and query are specified.")
         }
   
         sqlContext.sparkContext.setLocalProperty(
           SparkContext.SPARK_USER_RESOURCE_CONSUMER_ID, parentSession.getUserName)
         if (parentSession.getUserInfo != null) {
           sqlContext.sparkContext.setLocalProperty(
             SparkContext.SPARK_USER_RESOURCE_CONSUMER_PROFILE, parentSession.getUserInfo.profile)
         }
         sqlContext.sparkContext.setJobGroup(statementId, downloadQuery)
   
         logInfo(s"Running query [$statementId] in session " +
           s"[${parentSession.getSessionHandle.getSessionId.toString}] DOWNLOAD '$query'")
         val resultPath = writeData(new Path(pathPrefix, statementId))
   
         logQueryInfo(s"Running query [$statementId] in session " +
           s"[${parentSession.getSessionHandle.getSessionId.toString}] DOWNLOAD '$query'")
         val dataSize = fs.getContentSummary(resultPath).getLength
         logInfo(s"Try to download ${dataSize} bytes data from thriftserver.")
         totalDataSize = dataSize
   
         val list: JList[DownloadDataBlock] = new JArrayList[DownloadDataBlock]()
         // Add total data size to first row.
         list.add(DownloadDataBlock(schema = Some(schemaStr), dataSize = dataSize))
         // and then add data.
         fs.listStatus(resultPath, pathFilter).map(_.getPath).sortBy(_.getName).foreach { path =>
           val dataLen = fs.getFileStatus(path).getLen
           // Cast to BigDecimal to avoid overflowing
           val fetchBatchs =
             BigDecimal(dataLen)./(BigDecimal(fetchSize)).setScale(0, RoundingMode.CEILING).longValue()
           assert(fetchBatchs < Int.MaxValue, "The fetch batch too large.")
   
           (0 until fetchBatchs.toInt).foreach { i =>
             val fetchSizeInBatch = if (i == fetchBatchs - 1) dataLen - i * fetchSize else fetchSize
             list.add(DownloadDataBlock(
               path = Some(path), offset = Some(i * fetchSize), dataSize = fetchSizeInBatch))
           }
   
           list.add(DownloadDataBlock(path = Some(path), dataSize = -1))
         }
   
         iter = list.iterator()
         logInfo(s"Add ${list.size()} data blocks to be fetched.")
   
         setState(OperationState.FINISHED)
         logQueryInfo(s"Finished query [$statementId].")
       } catch {
         case NonFatal(e) =>
           logQueryError(s"Error executing query [$statementId]", e)
           setState(OperationState.ERROR)
           HiveThriftServer2.eventManager.onStatementError(
             statementId, Utils.findFirstCause(e).toString, Utils.exceptionString(e))
           val exception = new HiveSQLException(e)
           setOperationException(exception)
       }
     }
   
     private def writeData(path: Path): Path = withRetry {
       result = (Option(tableName), Option(query), Option(format), Option(options)) match {
         case (Some(t), None, _, _) =>
           sqlContext.table(t)
         case (None, Some(q), _, _) =>
           sqlContext.sql(q)
         case _ =>
           throw new HiveSQLException(s"Invalid arguments: ($tableName, $query, $format, $options).")
       }
   
       schemaStr = result.schema.map(_.name).mkString(writeOptions("delimiter"))
       val needRepartition = result.queryExecution.sparkPlan match {
         case _: SortExec => false
         case _: TakeOrderedAndProjectExec => false
         case ProjectExec(_, _: SortExec) => false
         case AdaptiveSparkPlanExec(_: SortExec, _, _, _) => false
         case AdaptiveSparkPlanExec(_: TakeOrderedAndProjectExec, _, _, _) => false
         case AdaptiveSparkPlanExec(ProjectExec(_, _: SortExec), _, _, _) => false
         case _: ShuffleExchangeExec => false
         case ProjectExec(_, _: ShuffleExchangeExec) => false
         case _: CollectLimitExec => false
         case _: LimitExec => false
         case _ => true
       }
       // Background: according to the official Hadoop FileSystem API spec,
       // rename op's destination path must have a parent that exists,
       // otherwise we may get unexpected result on the rename API.
       // When downloading dataset as parquet format, if we configure a
       // quota-free path and adopt FileOutputCommitter V1 algorithm, we will
       // get the "IOException: Failed to rename FileStatus".
       // Hence, the parent path should exist (see CARMEL-5150).
       if (!fs.exists(path) && !fs.mkdirs(path)) {
         logWarning(s"Failed to create parent download path ${path}")
       }
       val step1Path = new Path(path, "step1")
       val outputFormat = Option(format).getOrElse("csv")
       val (castCols, readSchema) = if (outputFormat.equalsIgnoreCase("csv")) {
         // Support duplicate columns
         val names = result.schema.map(_.name)
         val renameDuplicateNames = if (names.distinct.length != names.length) {
           val duplicateColumns = names.groupBy(identity).collect {
             case (x, ys) if ys.length > 1 => x
           }
           result.logicalPlan.output.zipWithIndex.map {
             case (col, index) if duplicateColumns.exists(_.equals(col.name)) =>
               col.withName(col.name + index)
             case (col, _) => col
           }
         } else {
           result.logicalPlan.output
         }
         // Support Complex types for csv file
         val output = renameDuplicateNames.map { col =>
           col.dataType match {
             case BinaryType => Column(col).cast(StringType).alias(col.name)
             case NullType => Column(col).cast(StringType).alias(col.name)
             case CalendarIntervalType => Column(col).cast(StringType).alias(col.name)
             case ArrayType(_, _) => Column(col).cast(StringType).alias(col.name)
             case MapType(_, _, _) => Column(col).cast(StringType).alias(col.name)
             case StructType(_) => Column(col).cast(StringType).alias(col.name)
             case _ => Column(col).alias(col.name)
           }
         }
         (output, StructType(StructType.fromAttributes(renameDuplicateNames)
           .map(_.copy(dataType = StringType))))
       } else if (outputFormat.equalsIgnoreCase("parquet")) {
         val output = result.logicalPlan.output.map { col =>
           col.dataType match {
             case BooleanType | ByteType | ShortType | IntegerType
                  | LongType | FloatType | DoubleType | BinaryType => Column(col).alias(col.name)
             case _ => Column(col).cast(StringType).alias(col.name)
           }
         }
         val newSchema = result.schema.map(s => s.dataType match {
           case BooleanType | ByteType | ShortType | IntegerType
                | LongType | FloatType | DoubleType | BinaryType => s
           case _ => s.copy(dataType = StringType)
         })
         (output, StructType(newSchema))
       } else {
         val output = result.logicalPlan.output.map(col => Column(col).alias(col.name))
         (output, result.schema)
       }
   
       val writePlan = if (!needRepartition) {
         result.select(castCols: _*)
       } else if (numFiles.nonEmpty) {
         result.select(castCols: _*).repartition(numFiles.get)
       } else {
         result.select(castCols: _*).repartition(Column(Rand(1)))
       }
   
       writePlan.write
         .options(writeOptions)
         .format(outputFormat)
         .mode(SaveMode.Overwrite)
         .save(step1Path.toString)
   
       val contentSummary = fs.getContentSummary(step1Path)
       val dataSize = contentSummary.getLength
       if (dataSize > sqlContext.conf.getConf(HIVE_THRIFT_SERVER_DATA_DOWNLOAD_MAX_SIZE)) {
         throw QueryLevelRestrictionErrors.downloadDataSizeExceeded(
           dataSize,
           sqlContext.conf.getConf(HIVE_THRIFT_SERVER_DATA_DOWNLOAD_MAX_SIZE))
       }
   
       step1Path
     }
   
     // Limit download speed.
     private var lastFetchTime = System.currentTimeMillis()
     private var downloadedDataSize = 0L
   
     override def getNextRowSet(orientation: FetchOrientation, maxRowsL: Long): RowSet = {
       val expectedMaxTime = math.min(maxFetchBlockTime,
         (downloadedDataSize.toDouble / (50L * 1024 * 1024)) * 1000L).toLong
       val downloadTime = System.currentTimeMillis() - lastFetchTime
       if (downloadTime < expectedMaxTime) {
         logInfo(s"Limit download speed ${downloadTime}, " +
           s"expected max download time ${expectedMaxTime}")
         Thread.sleep(expectedMaxTime - downloadTime)
       }
       lastFetchTime = System.currentTimeMillis()
   
       if (getStatus.getState ne OperationState.FINISHED) {
         throw getStatus.getOperationException
       }
       assertState(OperationState.FINISHED)
       validateFetchOrientation(orientation, JEnumSet.of(FetchOrientation.FETCH_NEXT))
   
       val rowSet: RowSet = RowSetFactory.create(getDownloadSchema, getProtocolVersion, false)
   
       if (!iter.hasNext) {
         rowSet
       } else {
         val maxRows = maxRowsL.toInt
         var curRow = 0
         while (curRow < maxRows && iter.hasNext) {
           val dataBlock = iter.next()
           val dataSize = dataBlock.dataSize
           dataBlock.path match {
             case Some(path) =>
               if (dataSize >= 0) {
                 val buffer: Array[Byte] = new Array[Byte](dataSize.toInt)
                 Utils.tryWithResource(fs.open(path)) { is =>
                   is.seek(dataBlock.offset.get)
                   is.readFully(buffer)
                 }
                 // data row
                 rowSet.addRow(Array[AnyRef](path.getName, buffer, null, Long.box(dataSize)))
                 downloadedDataSize = dataSize
               } else {
                 // End of file row
                 rowSet.addRow(Array[AnyRef](path.getName, null, null, Long.box(dataSize)))
               }
             case _ =>
               // Schema row and total data size row
               rowSet.addRow(Array[AnyRef](null, null, dataBlock.schema.get, Long.box(dataSize)))
           }
           curRow += 1
         }
         rowSet
       }
     }
   
     override def getResultSetSchema: TableSchema = {
       if (writeOptions.get("useRealSchema").nonEmpty
         && writeOptions("useRealSchema").equalsIgnoreCase("true")) {
         resultSchema
       } else {
         val ret = new TableSchema()
           .addPrimitiveColumn("FILE_NAME", Type.STRING_TYPE, "The file name to be transferred.")
           .addPrimitiveColumn("DATA", Type.BINARY_TYPE, "The data to be transferred.")
           .addPrimitiveColumn("SCHEMA", Type.STRING_TYPE, "The data schema to be transferred.")
           .addPrimitiveColumn("SIZE", Type.BIGINT_TYPE, "The size to be transferred in this fetch.")
         ret
       }
     }
   
     private def getDownloadSchema: TableSchema = {
       new TableSchema()
         .addPrimitiveColumn("FILE_NAME", Type.STRING_TYPE, "The file name to be transferred.")
         .addPrimitiveColumn("DATA", Type.BINARY_TYPE, "The data to be transferred.")
         .addPrimitiveColumn("SCHEMA", Type.STRING_TYPE, "The data schema to be transferred.")
         .addPrimitiveColumn("SIZE", Type.BIGINT_TYPE, "The size to be transferred in this fetch.")
     }
   
     override def cancel(): Unit = {
       if (statementId != null) {
         HiveThriftServer2.eventManager.onStatementCanceled(statementId)
       }
       HiveThriftServer2.eventManager.onQueryExist(
         statementId,
         QueryLogObjectList(Option(result).map(_.queryExecution)),
         QueryLogExtInfo(false, totalDataSize))
       cleanup(OperationState.CANCELED)
     }
   
     private def withRetry[T](f: => T): T = {
       val maxRetry = 2
       var retryNum = 0
   
       def retriable(t: Throwable): Boolean = {
         var cur = t
         while (retryNum < maxRetry && cur != null) {
           Utils.findFirstCause(cur) match {
             case f: FileNotFoundException if !f.getMessage.contains("shuffle_") =>
               // For some commands, they may failed when initiating dataset, since it will trigger
               // execution on dataset initialization. We need manually build a QueryExecution to
               // get the optimized plan.
               val qe = if (result != null) {
                 result.queryExecution
               } else {
                 val parsed = sqlContext.sessionState.sqlParser.parsePlan(query)
                 new QueryExecution(sqlContext.sparkSession, parsed)
               }
               qe.optimizedPlan.foreach {
                 case LogicalRelation(_, _, Some(table), _) =>
                   qe.sparkSession.sessionState.refreshTable(table.identifier.toString)
                 case HiveTableRelation(tableMeta, _, _, _, _, _) =>
                   qe.sparkSession.sessionState.refreshTable(tableMeta.identifier.toString)
                 case _ =>
               }
               return true
             case c => cur = cur.getCause()
           }
         }
         false
       }
   
       var res: Option[T] = None
       do {
         if (retryNum > 0) {
           logInfo(s"Start to retry query $statementId.")
         }
         try {
           res = Some(f)
         } catch {
           case e if retriable(e) =>
             logError(s"Query $statementId failed out of error ${e.getCause.getMessage}")
             retryNum += 1
           case e: Throwable =>
             throw e
         }
       } while (res.isEmpty)
       res.get
     }
   
     private def cleanup(state: OperationState) {
       setState(state)
       if (runInBackground) {
         val backgroundHandle = getBackgroundHandle()
         if (backgroundHandle != null) {
           backgroundHandle.cancel(true)
         }
       }
       if (statementId != null) {
         sqlContext.sparkContext.cancelJobGroup(statementId, Some("Clean up SparkDownloadData"))
         sqlContext.queryLoadLimitationManager.clean(statementId)
       }
   
       // Delete temp files
       try {
         fs.delete(pathPrefix, true)
       } catch {
         case e: IOException =>
           log.warn("Failed to remove download temp files.", e)
       }
       sqlContext.sparkContext.closeJobGroup(statementId)
     }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] wangyum commented on a change in pull request #2878: HIVE-24893: Add UploadData and DownloadData to TCLIService.thrift

Posted by GitBox <gi...@apache.org>.
wangyum commented on a change in pull request #2878:
URL: https://github.com/apache/hive/pull/2878#discussion_r784540945



##########
File path: service-rpc/if/TCLIService.thrift
##########
@@ -751,6 +751,52 @@ struct TGetTypeInfoResp {
   2: optional TOperationHandle operationHandle
 }
 
+// UploadData()
+//
+// UploadData data to table/path.
+struct TUploadDataReq {
+  // The session to execute the statement against
+  1: required TSessionHandle sessionHandle
+
+  // The table to be stored
+  2: optional string tableName
+
+  // The path to be stored
+  3: optional string path

Review comment:
       The use case is that the data may come from twitter or facebook and needs to be transformed to be written to the table.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] sunchao merged pull request #2878: HIVE-24893: Add UploadData and DownloadData to TCLIService.thrift

Posted by GitBox <gi...@apache.org>.
sunchao merged pull request #2878:
URL: https://github.com/apache/hive/pull/2878


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] sunchao commented on pull request #2878: HIVE-24893: Add UploadData and DownloadData to TCLIService.thrift

Posted by GitBox <gi...@apache.org>.
sunchao commented on pull request #2878:
URL: https://github.com/apache/hive/pull/2878#issuecomment-1013721287


   Merged to master, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] sunchao commented on pull request #2878: HIVE-24893: Add UploadData and DownloadData to TCLIService.thrift

Posted by GitBox <gi...@apache.org>.
sunchao commented on pull request #2878:
URL: https://github.com/apache/hive/pull/2878#issuecomment-1012398011


   Sorry for the delay @wangyum . I'll take a look at this very soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] wangyum commented on a change in pull request #2878: HIVE-24893: Add UploadData and DownloadData to TCLIService.thrift

Posted by GitBox <gi...@apache.org>.
wangyum commented on a change in pull request #2878:
URL: https://github.com/apache/hive/pull/2878#discussion_r784489171



##########
File path: service-rpc/if/TCLIService.thrift
##########
@@ -751,6 +751,52 @@ struct TGetTypeInfoResp {
   2: optional TOperationHandle operationHandle
 }
 
+// UploadData()
+//
+// UploadData data to table/path.
+struct TUploadDataReq {
+  // The session to execute the statement against
+  1: required TSessionHandle sessionHandle
+
+  // The table to be stored
+  2: optional string tableName
+
+  // The path to be stored
+  3: optional string path
+
+  // The data to be transferred
+  4: required binary values
+}
+
+struct TUploadDataResp {
+  1: required TStatus status
+  2: required TOperationHandle operationHandle
+}
+
+// DownloadData()
+//
+// Download data to JDBC client.
+struct TDownloadDataReq {
+  // The session to download data
+  1: required TSessionHandle sessionHandle
+
+  // The download table name
+  2: optional TPatternOrIdentifier tableName
+
+  // The download query
+  3: optional string query

Review comment:
       Download data from query. For example:
   ```sql
   SELECT * FROM t1 JOIN t2 ON t1.id = t2.id
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] wangyum commented on pull request #2878: HIVE-24893: Add UploadData and DownloadData to TCLIService.thrift

Posted by GitBox <gi...@apache.org>.
wangyum commented on pull request #2878:
URL: https://github.com/apache/hive/pull/2878#issuecomment-1008464805


   cc @sunchao 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] wangyum commented on pull request #2878: HIVE-24893: Add UploadData and DownloadData to TCLIService.thrift

Posted by GitBox <gi...@apache.org>.
wangyum commented on pull request #2878:
URL: https://github.com/apache/hive/pull/2878#issuecomment-1013781726


   Thank you @sunchao 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] wangyum commented on a change in pull request #2878: HIVE-24893: Add UploadData and DownloadData to TCLIService.thrift

Posted by GitBox <gi...@apache.org>.
wangyum commented on a change in pull request #2878:
URL: https://github.com/apache/hive/pull/2878#discussion_r784552194



##########
File path: service-rpc/if/TCLIService.thrift
##########
@@ -751,6 +751,52 @@ struct TGetTypeInfoResp {
   2: optional TOperationHandle operationHandle
 }
 
+// UploadData()
+//
+// UploadData data to table/path.
+struct TUploadDataReq {
+  // The session to execute the statement against
+  1: required TSessionHandle sessionHandle
+
+  // The table to be stored
+  2: optional string tableName

Review comment:
       Added it:
   https://github.com/apache/hive/pull/2878/files#diff-a97fa2c6efce03cd14924cbff678ef60b1846be6ff7f8275169a2f4d8799f16bR778




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] wangyum commented on a change in pull request #2878: HIVE-24893: Add UploadData and DownloadData to TCLIService.thrift

Posted by GitBox <gi...@apache.org>.
wangyum commented on a change in pull request #2878:
URL: https://github.com/apache/hive/pull/2878#discussion_r784532458



##########
File path: service-rpc/if/TCLIService.thrift
##########
@@ -751,6 +751,52 @@ struct TGetTypeInfoResp {
   2: optional TOperationHandle operationHandle
 }
 
+// UploadData()
+//
+// UploadData data to table/path.
+struct TUploadDataReq {
+  // The session to execute the statement against
+  1: required TSessionHandle sessionHandle
+
+  // The table to be stored
+  2: optional string tableName
+
+  // The path to be stored
+  3: optional string path
+
+  // The data to be transferred
+  4: required binary values
+}
+
+struct TUploadDataResp {
+  1: required TStatus status
+  2: required TOperationHandle operationHandle
+}
+
+// DownloadData()
+//
+// Download data to JDBC client.
+struct TDownloadDataReq {
+  // The session to download data
+  1: required TSessionHandle sessionHandle
+
+  // The download table name
+  2: optional TPatternOrIdentifier tableName

Review comment:
       It seems the requirement is very strong because JDBC mainly exposes table not path.
   
   Besides, we can [directly run sql on files](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#run-sql-on-files-directly), which is equivalent to supporting downloading from path. For example:
   ```sql
   SELECT * FROM parquet.`/tmp/spark/parquet`
   ```
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] sunchao commented on a change in pull request #2878: HIVE-24893: Add UploadData and DownloadData to TCLIService.thrift

Posted by GitBox <gi...@apache.org>.
sunchao commented on a change in pull request #2878:
URL: https://github.com/apache/hive/pull/2878#discussion_r784369235



##########
File path: service-rpc/if/TCLIService.thrift
##########
@@ -751,6 +751,52 @@ struct TGetTypeInfoResp {
   2: optional TOperationHandle operationHandle
 }
 
+// UploadData()
+//
+// UploadData data to table/path.
+struct TUploadDataReq {
+  // The session to execute the statement against
+  1: required TSessionHandle sessionHandle
+
+  // The table to be stored
+  2: optional string tableName
+
+  // The path to be stored
+  3: optional string path
+
+  // The data to be transferred
+  4: required binary values
+}
+
+struct TUploadDataResp {
+  1: required TStatus status
+  2: required TOperationHandle operationHandle
+}
+
+// DownloadData()
+//
+// Download data to JDBC client.
+struct TDownloadDataReq {
+  // The session to download data
+  1: required TSessionHandle sessionHandle
+
+  // The download table name
+  2: optional TPatternOrIdentifier tableName
+
+  // The download query
+  3: optional string query
+
+  // The download file format
+  4: optional string format

Review comment:
       Curious how is the file format being used in downloading data?

##########
File path: service-rpc/if/TCLIService.thrift
##########
@@ -751,6 +751,52 @@ struct TGetTypeInfoResp {
   2: optional TOperationHandle operationHandle
 }
 
+// UploadData()
+//
+// UploadData data to table/path.
+struct TUploadDataReq {
+  // The session to execute the statement against
+  1: required TSessionHandle sessionHandle
+
+  // The table to be stored
+  2: optional string tableName

Review comment:
       Should we make it more explicit and say that one of `tableName` or `path` must be set?

##########
File path: service-rpc/if/TCLIService.thrift
##########
@@ -751,6 +751,52 @@ struct TGetTypeInfoResp {
   2: optional TOperationHandle operationHandle
 }
 
+// UploadData()
+//
+// UploadData data to table/path.
+struct TUploadDataReq {
+  // The session to execute the statement against
+  1: required TSessionHandle sessionHandle
+
+  // The table to be stored
+  2: optional string tableName
+
+  // The path to be stored
+  3: optional string path
+
+  // The data to be transferred
+  4: required binary values
+}
+
+struct TUploadDataResp {
+  1: required TStatus status
+  2: required TOperationHandle operationHandle
+}
+
+// DownloadData()
+//
+// Download data to JDBC client.
+struct TDownloadDataReq {
+  // The session to download data
+  1: required TSessionHandle sessionHandle
+
+  // The download table name
+  2: optional TPatternOrIdentifier tableName
+
+  // The download query
+  3: optional string query

Review comment:
       What is this for?

##########
File path: service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
##########
@@ -36,56 +37,7 @@
 import org.apache.hive.service.cli.RowSetFactory;
 import org.apache.hive.service.cli.SessionHandle;
 import org.apache.hive.service.cli.TableSchema;
-import org.apache.hive.service.rpc.thrift.TCLIService;
-import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenReq;
-import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenResp;
-import org.apache.hive.service.rpc.thrift.TCancelOperationReq;
-import org.apache.hive.service.rpc.thrift.TCancelOperationResp;
-import org.apache.hive.service.rpc.thrift.TCloseOperationReq;
-import org.apache.hive.service.rpc.thrift.TCloseOperationResp;
-import org.apache.hive.service.rpc.thrift.TCloseSessionReq;
-import org.apache.hive.service.rpc.thrift.TCloseSessionResp;
-import org.apache.hive.service.rpc.thrift.TExecuteStatementReq;
-import org.apache.hive.service.rpc.thrift.TExecuteStatementResp;
-import org.apache.hive.service.rpc.thrift.TFetchResultsReq;
-import org.apache.hive.service.rpc.thrift.TFetchResultsResp;
-import org.apache.hive.service.rpc.thrift.TGetCatalogsReq;
-import org.apache.hive.service.rpc.thrift.TGetCatalogsResp;
-import org.apache.hive.service.rpc.thrift.TGetColumnsReq;
-import org.apache.hive.service.rpc.thrift.TGetColumnsResp;
-import org.apache.hive.service.rpc.thrift.TGetCrossReferenceReq;
-import org.apache.hive.service.rpc.thrift.TGetCrossReferenceResp;
-import org.apache.hive.service.rpc.thrift.TGetDelegationTokenReq;
-import org.apache.hive.service.rpc.thrift.TGetDelegationTokenResp;
-import org.apache.hive.service.rpc.thrift.TGetFunctionsReq;
-import org.apache.hive.service.rpc.thrift.TGetFunctionsResp;
-import org.apache.hive.service.rpc.thrift.TGetInfoReq;
-import org.apache.hive.service.rpc.thrift.TGetInfoResp;
-import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq;
-import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp;
-import org.apache.hive.service.rpc.thrift.TGetPrimaryKeysReq;
-import org.apache.hive.service.rpc.thrift.TGetPrimaryKeysResp;
-import org.apache.hive.service.rpc.thrift.TGetQueryIdReq;
-import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataReq;
-import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataResp;
-import org.apache.hive.service.rpc.thrift.TGetSchemasReq;
-import org.apache.hive.service.rpc.thrift.TGetSchemasResp;
-import org.apache.hive.service.rpc.thrift.TGetTableTypesReq;
-import org.apache.hive.service.rpc.thrift.TGetTableTypesResp;
-import org.apache.hive.service.rpc.thrift.TGetTablesReq;
-import org.apache.hive.service.rpc.thrift.TGetTablesResp;
-import org.apache.hive.service.rpc.thrift.TGetTypeInfoReq;
-import org.apache.hive.service.rpc.thrift.TGetTypeInfoResp;
-import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
-import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
-import org.apache.hive.service.rpc.thrift.TOperationHandle;
-import org.apache.hive.service.rpc.thrift.TProtocolVersion;
-import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenReq;
-import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenResp;
-import org.apache.hive.service.rpc.thrift.TSetClientInfoReq;
-import org.apache.hive.service.rpc.thrift.TSetClientInfoResp;
-import org.apache.hive.service.rpc.thrift.TStatus;
-import org.apache.hive.service.rpc.thrift.TStatusCode;
+import org.apache.hive.service.rpc.thrift.*;

Review comment:
       nit: don't use star import.

##########
File path: service-rpc/if/TCLIService.thrift
##########
@@ -751,6 +751,52 @@ struct TGetTypeInfoResp {
   2: optional TOperationHandle operationHandle
 }
 
+// UploadData()
+//
+// UploadData data to table/path.
+struct TUploadDataReq {
+  // The session to execute the statement against
+  1: required TSessionHandle sessionHandle
+
+  // The table to be stored
+  2: optional string tableName
+
+  // The path to be stored
+  3: optional string path
+
+  // The data to be transferred
+  4: required binary values
+}
+
+struct TUploadDataResp {
+  1: required TStatus status
+  2: required TOperationHandle operationHandle
+}
+
+// DownloadData()
+//
+// Download data to JDBC client.
+struct TDownloadDataReq {
+  // The session to download data
+  1: required TSessionHandle sessionHandle
+
+  // The download table name
+  2: optional TPatternOrIdentifier tableName

Review comment:
       Should we also support download from a specific path?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] wangyum commented on a change in pull request #2878: HIVE-24893: Add UploadData and DownloadData to TCLIService.thrift

Posted by GitBox <gi...@apache.org>.
wangyum commented on a change in pull request #2878:
URL: https://github.com/apache/hive/pull/2878#discussion_r784490241



##########
File path: service-rpc/if/TCLIService.thrift
##########
@@ -751,6 +751,52 @@ struct TGetTypeInfoResp {
   2: optional TOperationHandle operationHandle
 }
 
+// UploadData()
+//
+// UploadData data to table/path.
+struct TUploadDataReq {
+  // The session to execute the statement against
+  1: required TSessionHandle sessionHandle
+
+  // The table to be stored
+  2: optional string tableName
+
+  // The path to be stored
+  3: optional string path
+
+  // The data to be transferred
+  4: required binary values
+}
+
+struct TUploadDataResp {
+  1: required TStatus status
+  2: required TOperationHandle operationHandle
+}
+
+// DownloadData()
+//
+// Download data to JDBC client.
+struct TDownloadDataReq {
+  // The session to download data
+  1: required TSessionHandle sessionHandle
+
+  // The download table name
+  2: optional TPatternOrIdentifier tableName
+
+  // The download query
+  3: optional string query

Review comment:
       Download data from query. For example:
   ```sql
   SELECT * FROM t1 JOIN t2 ON t1.id = t2.id
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org