You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:43 UTC
[05/14] incubator-carbondata git commit: rebase
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
deleted file mode 100644
index cd629bf..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.databricks.spark.csv
-
-import java.io.IOException
-
-import scala.collection.JavaConverters._
-import scala.util.control.NonFatal
-
-import com.databricks.spark.csv.newapi.CarbonTextFile
-import com.databricks.spark.csv.util._
-import com.databricks.spark.sql.readers._
-import org.apache.commons.csv._
-import org.apache.commons.lang3.StringUtils
-import org.apache.hadoop.fs.Path
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql._
-import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation, TableScan}
-import org.apache.spark.sql.types._
-import org.slf4j.LoggerFactory
-
-import org.apache.carbondata.processing.etl.DataLoadingException
-
-case class CarbonCsvRelation protected[spark] (
- location: String,
- useHeader: Boolean,
- delimiter: Char,
- quote: Char,
- escape: Character,
- comment: Character,
- parseMode: String,
- parserLib: String,
- ignoreLeadingWhiteSpace: Boolean,
- ignoreTrailingWhiteSpace: Boolean,
- userSchema: StructType = null,
- charset: String = TextFile.DEFAULT_CHARSET.name(),
- inferCsvSchema: Boolean)(@transient val sqlContext: SQLContext)
- extends BaseRelation with TableScan with InsertableRelation {
-
- /**
- * Limit the number of lines we'll search for a header row that isn't comment-prefixed.
- */
- private val MAX_COMMENT_LINES_IN_HEADER = 10
-
- private val logger = LoggerFactory.getLogger(CarbonCsvRelation.getClass)
-
- // Parse mode flags
- if (!ParseModes.isValidMode(parseMode)) {
- logger.warn(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
- }
-
- if((ignoreLeadingWhiteSpace || ignoreLeadingWhiteSpace) && ParserLibs.isCommonsLib(parserLib)) {
- logger.warn(s"Ignore white space options may not work with Commons parserLib option")
- }
-
- private val failFast = ParseModes.isFailFastMode(parseMode)
- private val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
- private val permissive = ParseModes.isPermissiveMode(parseMode)
-
- val schema = inferSchema()
-
- def tokenRdd(header: Array[String]): RDD[Array[String]] = {
-
- val baseRDD = CarbonTextFile.withCharset(sqlContext.sparkContext, location, charset)
-
- if(ParserLibs.isUnivocityLib(parserLib)) {
- univocityParseCSV(baseRDD, header)
- } else {
- val csvFormat = CSVFormat.DEFAULT
- .withDelimiter(delimiter)
- .withQuote(quote)
- .withEscape(escape)
- .withSkipHeaderRecord(false)
- .withHeader(header: _*)
- .withCommentMarker(comment)
-
- // If header is set, make sure firstLine is materialized before sending to executors.
- val filterLine = if (useHeader) firstLine else null
-
- baseRDD.mapPartitions { iter =>
- // When using header, any input line that equals firstLine is assumed to be header
- val csvIter = if (useHeader) {
- iter.filter(_ != filterLine)
- } else {
- iter
- }
- parseCSV(csvIter, csvFormat)
- }
- }
- }
-
- // By making this a lazy val we keep the RDD around, amortizing the cost of locating splits.
- def buildScan: RDD[Row] = {
- val schemaFields = schema.fields
- tokenRdd(schemaFields.map(_.name)).flatMap{ tokens =>
-
- if (dropMalformed && schemaFields.length != tokens.size) {
- logger.warn(s"Dropping malformed line: $tokens")
- None
- } else if (failFast && schemaFields.length != tokens.size) {
- throw new RuntimeException(s"Malformed line in FAILFAST mode: $tokens")
- } else {
- var index: Int = 0
- val rowArray = new Array[Any](schemaFields.length)
- try {
- index = 0
- while (index < schemaFields.length) {
- val field = schemaFields(index)
- rowArray(index) = TypeCast.castTo(tokens(index), field.dataType, field.nullable)
- index = index + 1
- }
- Some(Row.fromSeq(rowArray))
- } catch {
- case aiob: ArrayIndexOutOfBoundsException if permissive =>
- (index until schemaFields.length).foreach(ind => rowArray(ind) = null)
- Some(Row.fromSeq(rowArray))
- }
- }
- }
- }
-
- private def inferSchema(): StructType = {
- if (this.userSchema != null) {
- userSchema
- } else {
- val firstRow = if (ParserLibs.isUnivocityLib(parserLib)) {
- val escapeVal = if (escape == null) '\\' else escape.charValue()
- val commentChar: Char = if (comment == null) '\0' else comment
- new LineCsvReader(fieldSep = delimiter, quote = quote, escape = escapeVal,
- commentMarker = commentChar).parseLine(firstLine)
- } else {
- val csvFormat = CSVFormat.DEFAULT
- .withDelimiter(delimiter)
- .withQuote(quote)
- .withEscape(escape)
- .withSkipHeaderRecord(false)
- CSVParser.parse(firstLine, csvFormat).getRecords.get(0).asScala.toArray
- }
- if(null == firstRow) {
- throw new DataLoadingException("First line of the csv is not valid.")
- }
- val header = if (useHeader) {
- firstRow
- } else {
- firstRow.zipWithIndex.map { case (value, index) => s"C$index"}
- }
- if (this.inferCsvSchema) {
- InferSchema(tokenRdd(header), header)
- } else {
- // By default fields are assumed to be StringType
- val schemaFields = header.map { fieldName =>
- StructField(fieldName.toString, StringType, nullable = true)
- }
- StructType(schemaFields)
- }
- }
- }
-
- /**
- * Returns the first line of the first non-empty file in path
- */
- private lazy val firstLine = {
- val csv = CarbonTextFile.withCharset(sqlContext.sparkContext, location, charset)
- if (comment == null) {
- csv.first()
- } else {
- csv.take(MAX_COMMENT_LINES_IN_HEADER)
- .find(x => !StringUtils.isEmpty(x) && !x.startsWith(comment.toString))
- .getOrElse(sys.error(s"No uncommented header line in " +
- s"first $MAX_COMMENT_LINES_IN_HEADER lines"))
- }
- }
-
- private def univocityParseCSV(
- file: RDD[String],
- header: Seq[String]): RDD[Array[String]] = {
- // If header is set, make sure firstLine is materialized before sending to executors.
- val filterLine = if (useHeader) firstLine else null
- val dataLines = if (useHeader) file.filter(_ != filterLine) else file
- val rows = dataLines.mapPartitionsWithIndex({
- case (split, iter) =>
- val escapeVal = if (escape == null) '\\' else escape.charValue()
- val commentChar: Char = if (comment == null) '\0' else comment
-
- new CarbonBulkCsvReader(iter, split,
- headers = header, fieldSep = delimiter,
- quote = quote, escape = escapeVal, commentMarker = commentChar,
- ignoreLeadingSpace = ignoreLeadingWhiteSpace,
- ignoreTrailingSpace = ignoreTrailingWhiteSpace)
- }, true)
- rows
- }
-
- private def parseCSV(
- iter: Iterator[String],
- csvFormat: CSVFormat): Iterator[Array[String]] = {
- iter.flatMap { line =>
- try {
- val records = CSVParser.parse(line, csvFormat).getRecords
- if (records.isEmpty) {
- logger.warn(s"Ignoring empty line: $line")
- None
- } else {
- Some(records.get(0).asScala.toArray)
- }
- } catch {
- case NonFatal(e) if !failFast =>
- logger.error(s"Exception while parsing line: $line. ", e)
- None
- }
- }
- }
-
- // The function below was borrowed from JSONRelation
- override def insert(data: DataFrame, overwrite: Boolean): Unit = {
- val filesystemPath = new Path(location)
- val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-
- if (overwrite) {
- try {
- fs.delete(filesystemPath, true)
- } catch {
- case e: IOException =>
- throw new IOException(
- s"Unable to clear output directory ${filesystemPath.toString} prior"
- + s" to INSERT OVERWRITE a CSV table:\n${e.toString}")
- }
- // Write the data. We assume that schema isn't changed, and we won't update it.
- data.saveAsCsvFile(location, Map("delimiter" -> delimiter.toString))
- } else {
- sys.error("CSV tables only support INSERT OVERWRITE for now.")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
deleted file mode 100644
index 3589823..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.databricks.spark.csv.newapi
-
-import java.nio.charset.Charset
-
-import com.databricks.spark.csv.util.TextFile
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.{LongWritable, Text}
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat}
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.{NewHadoopRDD, RDD}
-
-import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-
-/**
- * create RDD use CarbonDataLoadInputFormat
- */
-private[csv] object CarbonTextFile {
-
- private def newHadoopRDD(sc: SparkContext, location: String) = {
- val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)
- hadoopConfiguration.setStrings(FileInputFormat.INPUT_DIR, location)
- hadoopConfiguration.setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, true)
- hadoopConfiguration.set("io.compression.codecs",
- """org.apache.hadoop.io.compress.GzipCodec,
- org.apache.hadoop.io.compress.DefaultCodec,
- org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)
-
- CarbonDataRDDFactory.configSplitMaxSize(sc, location, hadoopConfiguration)
- new NewHadoopRDD[LongWritable, Text](
- sc,
- classOf[TextInputFormat],
- classOf[LongWritable],
- classOf[Text],
- hadoopConfiguration).setName("newHadoopRDD-spark-csv")
- }
-
- def withCharset(sc: SparkContext, location: String, charset: String): RDD[String] = {
- if (Charset.forName(charset) == TextFile.DEFAULT_CHARSET) {
- newHadoopRDD(sc, location).map(pair => pair._2.toString)
- } else {
- // can't pass a Charset object here cause its not serializable
- // TODO: maybe use mapPartitions instead?
- newHadoopRDD(sc, location).map(
- pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
deleted file mode 100644
index cd76651..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.databricks.spark.csv.newapi
-
-import com.databricks.spark.csv.{CarbonCsvRelation, CsvSchemaRDD}
-import com.databricks.spark.csv.util.{ParserLibs, TextFile, TypeCast}
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
-import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.StructType
-
-/**
- * Provides access to CSV data from pure SQL statements (i.e. for users of the
- * JDBC server).
- */
-class DefaultSource
- extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider {
-
- private def checkPath(parameters: Map[String, String]): String = {
- parameters.getOrElse("path", sys.error("'path' must be specified for CSV data."))
- }
-
- /**
- * Creates a new relation for data store in CSV given parameters.
- * Parameters have to include 'path' and optionally 'delimiter', 'quote', and 'header'
- */
- override def createRelation(sqlContext: SQLContext,
- parameters: Map[String, String]): BaseRelation = {
- createRelation(sqlContext, parameters, null)
- }
-
- /**
- * Creates a new relation for data store in CSV given parameters and user supported schema.
- * Parameters have to include 'path' and optionally 'delimiter', 'quote', and 'header'
- */
- override def createRelation(
- sqlContext: SQLContext,
- parameters: Map[String, String],
- schema: StructType): BaseRelation = {
- val path = checkPath(parameters)
- val delimiter = TypeCast.toChar(parameters.getOrElse("delimiter", ","))
-
- val quote = parameters.getOrElse("quote", "\"")
- val quoteChar = if (quote.length == 1) {
- quote.charAt(0)
- } else {
- throw new Exception("Quotation cannot be more than one character.")
- }
-
- val escape = parameters.getOrElse("escape", null)
- val escapeChar: Character = if (escape == null || (escape.length == 0)) {
- null
- } else if (escape.length == 1) {
- escape.charAt(0)
- } else {
- throw new Exception("Escape character cannot be more than one character.")
- }
-
- val comment = parameters.getOrElse("comment", "#")
- val commentChar: Character = if (comment == null) {
- null
- } else if (comment.length == 1) {
- comment.charAt(0)
- } else {
- throw new Exception("Comment marker cannot be more than one character.")
- }
-
- val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
-
- val useHeader = parameters.getOrElse("header", "false")
- val headerFlag = if (useHeader == "true") {
- true
- } else if (useHeader == "false") {
- false
- } else {
- throw new Exception("Header flag can be true or false")
- }
-
- val parserLib = parameters.getOrElse("parserLib", ParserLibs.DEFAULT)
- val ignoreLeadingWhiteSpace = parameters.getOrElse("ignoreLeadingWhiteSpace", "false")
- val ignoreLeadingWhiteSpaceFlag = if (ignoreLeadingWhiteSpace == "false") {
- false
- } else if (ignoreLeadingWhiteSpace == "true") {
- if (!ParserLibs.isUnivocityLib(parserLib)) {
- throw new Exception("Ignore whitesspace supported for Univocity parser only")
- }
- true
- } else {
- throw new Exception("Ignore white space flag can be true or false")
- }
- val ignoreTrailingWhiteSpace = parameters.getOrElse("ignoreTrailingWhiteSpace", "false")
- val ignoreTrailingWhiteSpaceFlag = if (ignoreTrailingWhiteSpace == "false") {
- false
- } else if (ignoreTrailingWhiteSpace == "true") {
- if (!ParserLibs.isUnivocityLib(parserLib)) {
- throw new Exception("Ignore whitespace supported for the Univocity parser only")
- }
- true
- } else {
- throw new Exception("Ignore white space flag can be true or false")
- }
-
- val charset = parameters.getOrElse("charset", TextFile.DEFAULT_CHARSET.name())
- // TODO validate charset?
-
- val inferSchema = parameters.getOrElse("inferSchema", "false")
- val inferSchemaFlag = if (inferSchema == "false") {
- false
- } else if (inferSchema == "true") {
- true
- } else {
- throw new Exception("Infer schema flag can be true or false")
- }
-
- CarbonCsvRelation(path,
- headerFlag,
- delimiter,
- quoteChar,
- escapeChar,
- commentChar,
- parseMode,
- parserLib,
- ignoreLeadingWhiteSpaceFlag,
- ignoreTrailingWhiteSpaceFlag,
- schema,
- charset,
- inferSchemaFlag)(sqlContext)
- }
-
- override def createRelation(
- sqlContext: SQLContext,
- mode: SaveMode,
- parameters: Map[String, String],
- data: DataFrame): BaseRelation = {
- val path = checkPath(parameters)
- val filesystemPath = new Path(path)
- val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- val doSave = if (fs.exists(filesystemPath)) {
- mode match {
- case SaveMode.Append =>
- sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}")
- case SaveMode.Overwrite =>
- fs.delete(filesystemPath, true)
- true
- case SaveMode.ErrorIfExists =>
- sys.error(s"path $path already exists.")
- case SaveMode.Ignore => false
- }
- } else {
- true
- }
-
- val codec: Class[_ <: CompressionCodec] =
- parameters.getOrElse("codec", "none") match {
- case "gzip" => classOf[GzipCodec]
- case _ => null
- }
-
- if (doSave) {
- // Only save data when the save mode is not ignore.
- data.saveAsCsvFile(path, parameters, codec)
- }
-
- createRelation(sqlContext, parameters, data.schema)
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
deleted file mode 100644
index 5a02bfd..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.Partitioner
-
-import org.apache.carbondata.spark.Value
-import org.apache.carbondata.spark.util.CarbonQueryUtil
-
-
-class CarbonCleanFilesRDD[V: ClassTag](
- sc: SparkContext,
- valueClass: Value[V],
- databaseName: String,
- tableName: String,
- partitioner: Partitioner)
- extends RDD[V](sc, Nil) {
-
- sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
- override def getPartitions: Array[Partition] = {
- val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
- splits.zipWithIndex.map(s => new CarbonLoadPartition(id, s._2, s._1))
- }
-
- override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
- val iter = new Iterator[(V)] {
- val split = theSplit.asInstanceOf[CarbonLoadPartition]
- logInfo("Input split: " + split.serializableHadoopSplit.value)
- // TODO call CARBON delete API
-
-
- var havePair = false
- var finished = false
-
- override def hasNext: Boolean = {
- if (!finished && !havePair) {
- finished = true
- havePair = !finished
- }
- !finished
- }
-
- override def next(): V = {
- if (!hasNext) {
- throw new java.util.NoSuchElementException("End of stream")
- }
- havePair = false
- valueClass.getValue(null)
- }
-
- }
- iter
- }
-
- override def getPreferredLocations(split: Partition): Seq[String] = {
- val theSplit = split.asInstanceOf[CarbonLoadPartition]
- val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
- logInfo("Host Name: " + s.head + s.length)
- s
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
deleted file mode 100644
index e306a89..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ /dev/null
@@ -1,604 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.carbondata.spark.rdd
-
-import java.lang.Long
-import java.text.SimpleDateFormat
-import java.util
-import java.util.UUID
-
-import scala.collection.JavaConverters._
-import scala.util.Random
-
-import org.apache.spark.{Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.rdd.DataLoadCoalescedRDD
-import org.apache.spark.rdd.DataLoadPartitionWrap
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.Partitioner
-import org.apache.spark.sql.Row
-import org.apache.spark.util.TaskContextUtil
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.StandardLogService
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
-import org.apache.carbondata.processing.constants.DataProcessorConstants
-import org.apache.carbondata.processing.csvreaderstep.JavaRddIterator
-import org.apache.carbondata.processing.csvreaderstep.RddInputUtils
-import org.apache.carbondata.processing.etl.DataLoadingException
-import org.apache.carbondata.processing.graphgenerator.GraphGenerator
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.spark.DataLoadResult
-import org.apache.carbondata.spark.load._
-import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.CarbonQueryUtil
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-
-/**
- * This partition class use to split by TableSplit
- *
- */
-class CarbonTableSplitPartition(rddId: Int, val idx: Int, @transient val tableSplit: TableSplit,
- val blocksDetails: Array[BlockDetails])
- extends Partition {
-
- override val index: Int = idx
- val serializableHadoopSplit = new SerializableWritable[TableSplit](tableSplit)
- val partitionBlocksDetail = blocksDetails
-
- override def hashCode(): Int = 41 * (41 + rddId) + idx
-}
-
-/**
- * This partition class use to split by Host
- *
- */
-class CarbonNodePartition(rddId: Int, val idx: Int, host: String,
- val blocksDetails: Array[BlockDetails])
- extends Partition {
-
- override val index: Int = idx
- val serializableHadoopSplit = host
- val nodeBlocksDetail = blocksDetails
-
- override def hashCode(): Int = 41 * (41 + rddId) + idx
-}
-
-class SparkPartitionLoader(model: CarbonLoadModel,
- splitIndex: Int,
- storePath: String,
- kettleHomePath: String,
- loadCount: Int,
- loadMetadataDetails: LoadMetadataDetails) {
- private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- var storeLocation: String = ""
-
- def initialize(): Unit = {
- val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
- if (null == carbonPropertiesFilePath) {
- System.setProperty("carbon.properties.filepath",
- System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
- }
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId)
- CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true")
- CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1")
- CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true")
- CarbonProperties.getInstance().addProperty("is.int.based.indexer", "true")
- CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", "true")
- CarbonProperties.getInstance().addProperty("high.cardinality.value", "100000")
- CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false")
- CarbonProperties.getInstance().addProperty("carbon.leaf.node.size", "120000")
-
- // this property is used to determine whether temp location for carbon is inside
- // container temp dir or is yarn application directory.
- val carbonUseLocalDir = CarbonProperties.getInstance()
- .getProperty("carbon.use.local.dir", "false")
- if (carbonUseLocalDir.equalsIgnoreCase("true")) {
- val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
- if (null != storeLocations && storeLocations.nonEmpty) {
- storeLocation = storeLocations(Random.nextInt(storeLocations.length))
- }
- if (storeLocation == null) {
- storeLocation = System.getProperty("java.io.tmpdir")
- }
- } else {
- storeLocation = System.getProperty("java.io.tmpdir")
- }
- storeLocation = storeLocation + '/' + System.nanoTime() + '/' + splitIndex
- }
-
- def run(): Unit = {
- try {
- CarbonLoaderUtil.executeGraph(model, storeLocation, storePath,
- kettleHomePath)
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
- } catch {
- case e: DataLoadingException => if (e.getErrorCode ==
- DataProcessorConstants.BAD_REC_FOUND) {
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
- LOGGER.info("Bad Record Found")
- } else {
- throw e
- }
- case e: Exception =>
- throw e
- } finally {
- // delete temp location data
- try {
- val isCompaction = false
- CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, isCompaction)
- } catch {
- case e: Exception =>
- LOGGER.error(e, "Failed to delete local data")
- }
- if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(
- loadMetadataDetails.getLoadStatus)) {
- if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
- .equals(loadMetadataDetails.getLoadStatus)) {
- LOGGER.info("DataLoad complete")
- LOGGER.info("Data Load partially successful with LoadCount:" + loadCount)
- } else {
- LOGGER.info("DataLoad complete")
- LOGGER.info("Data Loaded successfully with LoadCount:" + loadCount)
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance.printStatisticsInfo(
- model.getPartitionId)
- }
- }
- }
- }
-}
-
-/**
- * Use this RDD class to load csv data file
- *
- * @param sc The SparkContext to associate the RDD with.
- * @param result Output result
- * @param carbonLoadModel Carbon load model which contain the load info
- * @param storePath The store location
- * @param kettleHomePath The kettle home path
- * @param partitioner Partitioner which specify how to partition
- * @param columinar whether it is columinar
- * @param loadCount Current load count
- * @param tableCreationTime Time of creating table
- * @param schemaLastUpdatedTime Time of last schema update
- * @param blocksGroupBy Blocks Array which is group by partition or host
- * @param isTableSplitPartition Whether using table split partition
- * @tparam K Class of the key associated with the Result.
- * @tparam V Class of the value associated with the Result.
- */
-class DataFileLoaderRDD[K, V](
- sc: SparkContext,
- result: DataLoadResult[K, V],
- carbonLoadModel: CarbonLoadModel,
- storePath: String,
- kettleHomePath: String,
- partitioner: Partitioner,
- columinar: Boolean,
- loadCount: Integer,
- tableCreationTime: Long,
- schemaLastUpdatedTime: Long,
- blocksGroupBy: Array[(String, Array[BlockDetails])],
- isTableSplitPartition: Boolean) extends RDD[(K, V)](sc, Nil) {
-
- sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
- override def getPartitions: Array[Partition] = {
- if (isTableSplitPartition) {
- // for table split partition
- var splits = Array[TableSplit]()
- if (carbonLoadModel.isDirectLoad) {
- splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
- } else {
- splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName, null)
- }
-
- splits.zipWithIndex.map { case (split, index) =>
- // filter the same partition unique id, because only one will match, so get 0 element
- val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter { case (uniqueId, _) =>
- uniqueId == split.getPartition.getUniqueID
- }(0)._2
- new CarbonTableSplitPartition(id, index, split, blocksDetails)
- }
- } else {
- // for node partition
- blocksGroupBy.zipWithIndex.map { case ((uniqueId, blockDetails), index) =>
- new CarbonNodePartition(id, index, uniqueId, blockDetails)
- }
- }
- }
-
- override def checkpoint() {
- // Do nothing. Hadoop RDD should not be checkpointed.
- }
-
- override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- val iter = new Iterator[(K, V)] {
- var partitionID = "0"
- val loadMetadataDetails = new LoadMetadataDetails()
- var model: CarbonLoadModel = _
- var uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE +
- theSplit.index
- try {
- loadMetadataDetails.setPartitionCount(partitionID)
- carbonLoadModel.setSegmentId(String.valueOf(loadCount))
- setModelAndBlocksInfo()
- val loader = new SparkPartitionLoader(model, theSplit.index, storePath,
- kettleHomePath, loadCount, loadMetadataDetails)
- loader.initialize
- if (model.isRetentionRequest) {
- recreateAggregationTableForRetention
- } else if (model.isAggLoadRequest) {
- loadMetadataDetails.setLoadStatus(createManualAggregateTable)
- } else {
- loader.run()
- }
- } catch {
- case e: Exception =>
- logInfo("DataLoad failure")
- LOGGER.error(e)
- throw e
- }
-
- def setModelAndBlocksInfo(): Unit = {
- if (isTableSplitPartition) {
- // for table split partition
- val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
- logInfo("Input split: " + split.serializableHadoopSplit.value)
- val blocksID = gernerateBlocksID
- carbonLoadModel.setBlocksID(blocksID)
- carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
- if (carbonLoadModel.isDirectLoad) {
- model = carbonLoadModel.getCopyWithPartition(
- split.serializableHadoopSplit.value.getPartition.getUniqueID,
- split.serializableHadoopSplit.value.getPartition.getFilesPath,
- carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
- } else {
- model = carbonLoadModel.getCopyWithPartition(
- split.serializableHadoopSplit.value.getPartition.getUniqueID)
- }
- partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
- // get this partition data blocks and put it to global static map
- GraphGenerator.blockInfo.put(blocksID, split.partitionBlocksDetail)
- StandardLogService.setThreadName(partitionID, null)
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordPartitionBlockMap(
- partitionID, split.partitionBlocksDetail.length)
- } else {
- // for node partition
- val split = theSplit.asInstanceOf[CarbonNodePartition]
- logInfo("Input split: " + split.serializableHadoopSplit)
- logInfo("The Block Count in this node: " + split.nodeBlocksDetail.length)
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap(
- split.serializableHadoopSplit, split.nodeBlocksDetail.length)
- val blocksID = gernerateBlocksID
- carbonLoadModel.setBlocksID(blocksID)
- carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
- // set this node blocks info to global static map
- GraphGenerator.blockInfo.put(blocksID, split.nodeBlocksDetail)
- if (carbonLoadModel.isDirectLoad) {
- val filelist: java.util.List[String] = new java.util.ArrayList[String](
- CarbonCommonConstants.CONSTANT_SIZE_TEN)
- CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
- model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
- carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
- } else {
- model = carbonLoadModel.getCopyWithPartition(partitionID)
- }
- StandardLogService.setThreadName(blocksID, null)
- }
- }
-
- /**
- * generate blocks id
- *
- * @return
- */
- def gernerateBlocksID: String = {
- if (isTableSplitPartition) {
- carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
- theSplit.asInstanceOf[CarbonTableSplitPartition].serializableHadoopSplit.value
- .getPartition.getUniqueID + "_" + UUID.randomUUID()
- } else {
- carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
- UUID.randomUUID()
- }
- }
-
- def checkAndLoadAggregationTable: String = {
- val schema = model.getCarbonDataLoadSchema
- val aggTables = schema.getCarbonTable.getAggregateTablesName
- if (null != aggTables && !aggTables.isEmpty) {
- val details = model.getLoadMetadataDetails.asScala.toArray
- val newSlice = CarbonCommonConstants.LOAD_FOLDER + loadCount
- var listOfLoadFolders = CarbonLoaderUtil.getListOfValidSlices(details)
- listOfLoadFolders = CarbonLoaderUtil.addNewSliceNameToList(newSlice, listOfLoadFolders)
- val listOfUpdatedLoadFolders = CarbonLoaderUtil.getListOfUpdatedSlices(details)
- var listOfAllLoadFolders = CarbonQueryUtil.getListOfSlices(details)
- listOfAllLoadFolders = CarbonLoaderUtil
- .addNewSliceNameToList(newSlice, listOfAllLoadFolders)
- val copyListOfLoadFolders = listOfLoadFolders.asScala.toList
- val copyListOfUpdatedLoadFolders = listOfUpdatedLoadFolders.asScala.toList
- loadTableSlices(listOfAllLoadFolders, details)
- val loadFolders = Array[String]()
- loadMetadataDetails.setLoadStatus(iterateOverAggTables(aggTables,
- copyListOfLoadFolders.asJava, copyListOfUpdatedLoadFolders.asJava, loadFolders))
- if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(
- loadMetadataDetails.getLoadStatus)) {
- // remove the current slice from memory not the table
- CarbonLoaderUtil
- .removeSliceFromMemory(model.getDatabaseName, model.getTableName, newSlice)
- logInfo(s"Aggregate table creation failed")
- } else {
- logInfo("Aggregate tables creation successfull")
- }
- }
- loadMetadataDetails.getLoadStatus
- }
-
- def loadTableSlices(listOfAllLoadFolders: java.util.List[String],
- loadMetadataDetails: Array[LoadMetadataDetails]) = {
- CarbonProperties.getInstance().addProperty("carbon.cache.used", "false")
- // TODO: Implement it
- }
-
- def createManualAggregateTable: String = {
- val details = model.getLoadMetadataDetails.asScala.toArray
- val listOfAllLoadFolders = CarbonQueryUtil.getListOfSlices(details)
- val listOfLoadFolders = CarbonLoaderUtil.getListOfValidSlices(details)
- val listOfUpdatedLoadFolders = CarbonLoaderUtil.getListOfUpdatedSlices(details)
- loadTableSlices(listOfAllLoadFolders, details)
- val loadFolders = Array[String]()
- val aggTable = model.getAggTableName
- loadMetadataDetails.setLoadStatus(loadAggregationTable(listOfLoadFolders,
- listOfUpdatedLoadFolders, loadFolders))
- if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(
- loadMetadataDetails.getLoadStatus)) {
- logInfo(s"Aggregate table creation failed :: $aggTable")
- } else {
- logInfo(s"Aggregate table creation successfull :: $aggTable")
- }
- loadMetadataDetails.getLoadStatus
- }
-
- def recreateAggregationTableForRetention = {
- val schema = model.getCarbonDataLoadSchema
- val aggTables = schema.getCarbonTable.getAggregateTablesName
- if (null != aggTables && !aggTables.isEmpty) {
- val details = model.getLoadMetadataDetails.asScala.toArray
- val listOfLoadFolders = CarbonLoaderUtil.getListOfValidSlices(details)
- val listOfUpdatedLoadFolders = CarbonLoaderUtil.getListOfUpdatedSlices(details)
- val listOfAllLoadFolder = CarbonQueryUtil.getListOfSlices(details)
- loadTableSlices(listOfAllLoadFolder, details)
- val loadFolders = Array[String]()
- iterateOverAggTables(aggTables, listOfLoadFolders, listOfUpdatedLoadFolders, loadFolders)
- }
- }
-
- // TODO Aggregate table needs to be handled
- def iterateOverAggTables(aggTables: java.util.List[String],
- listOfLoadFolders: java.util.List[String],
- listOfUpdatedLoadFolders: java.util.List[String],
- loadFolders: Array[String]): String = {
- model.setAggLoadRequest(true)
- aggTables.asScala.foreach { aggTable =>
- model.setAggTableName(aggTable)
- loadMetadataDetails.setLoadStatus(loadAggregationTable(listOfLoadFolders,
- listOfUpdatedLoadFolders, loadFolders))
- if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(
- loadMetadataDetails.getLoadStatus)) {
- logInfo(s"Aggregate table creation failed :: aggTable")
- return loadMetadataDetails.getLoadStatus
- }
- }
- loadMetadataDetails.getLoadStatus
- }
-
- def loadAggregationTable(listOfLoadFolders: java.util.List[String],
- listOfUpdatedLoadFolders: java.util.List[String],
- loadFolders: Array[String]): String = {
- // TODO: Implement it
- loadMetadataDetails.getLoadStatus
- }
-
- var finished = false
-
- override def hasNext: Boolean = {
- !finished
- }
-
- override def next(): (K, V) = {
- finished = true
- result.getKey(uniqueLoadStatusId, loadMetadataDetails)
- }
- }
- iter
- }
-
- override def getPreferredLocations(split: Partition): Seq[String] = {
- if (isTableSplitPartition) {
- // for table split partition
- val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
- val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
- location
- } else {
- // for node partition
- val theSplit = split.asInstanceOf[CarbonNodePartition]
- val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
- logInfo("Preferred Location for split: " + firstOptionLocation.head)
- val blockMap = new util.LinkedHashMap[String, Integer]()
- val tableBlocks = theSplit.blocksDetails
- tableBlocks.foreach { tableBlock =>
- tableBlock.getLocations.foreach { location =>
- if (!firstOptionLocation.exists(location.equalsIgnoreCase)) {
- val currentCount = blockMap.get(location)
- if (currentCount == null) {
- blockMap.put(location, 1)
- } else {
- blockMap.put(location, currentCount + 1)
- }
- }
- }
- }
-
- val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
- nodeCount1.getValue > nodeCount2.getValue
- }
- )
-
- val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
- firstOptionLocation ++ sortedNodesList
- }
- }
-
-}
-
-/**
- * Use this RDD class to load RDD
- *
- * @param sc
- * @param result
- * @param carbonLoadModel
- * @param storePath
- * @param kettleHomePath
- * @param columinar
- * @param loadCount
- * @param tableCreationTime
- * @param schemaLastUpdatedTime
- * @param prev
- * @tparam K
- * @tparam V
- */
-class DataFrameLoaderRDD[K, V](
- sc: SparkContext,
- result: DataLoadResult[K, V],
- carbonLoadModel: CarbonLoadModel,
- storePath: String,
- kettleHomePath: String,
- columinar: Boolean,
- loadCount: Integer,
- tableCreationTime: Long,
- schemaLastUpdatedTime: Long,
- prev: DataLoadCoalescedRDD[Row]) extends RDD[(K, V)](prev) {
-
- sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
- @DeveloperApi
- override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- val resultIter = new Iterator[(K, V)] {
- var partitionID = "0"
- val loadMetadataDetails = new LoadMetadataDetails()
- var uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE +
- theSplit.index
- try {
- loadMetadataDetails.setPartitionCount(partitionID)
- carbonLoadModel.setPartitionId(partitionID)
- carbonLoadModel.setSegmentId(String.valueOf(loadCount))
- carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
- val loader = new SparkPartitionLoader(carbonLoadModel, theSplit.index, storePath,
- kettleHomePath, loadCount, loadMetadataDetails)
- loader.initialize
- val rddIteratorKey = UUID.randomUUID().toString
- try {
- RddInputUtils.put(rddIteratorKey,
- new PartitionIterator(
- firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit, context),
- carbonLoadModel,
- context))
- carbonLoadModel.setRddIteratorKey(rddIteratorKey)
- loader.run()
- } finally {
- RddInputUtils.remove(rddIteratorKey)
- }
- } catch {
- case e: Exception =>
- logInfo("DataLoad failure")
- LOGGER.error(e)
- throw e
- }
-
- var finished = false
-
- override def hasNext: Boolean = !finished
-
- override def next(): (K, V) = {
- finished = true
- result.getKey(uniqueLoadStatusId, loadMetadataDetails)
- }
- }
- resultIter
- }
-
- override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
-}
-
-class PartitionIterator(partitionIter: Iterator[DataLoadPartitionWrap[Row]],
- carbonLoadModel: CarbonLoadModel,
- context: TaskContext) extends JavaRddIterator[JavaRddIterator[Array[String]]] {
- def hasNext: Boolean = partitionIter.hasNext
- def next: JavaRddIterator[Array[String]] = {
- val value = partitionIter.next
- new RddIterator(value.rdd.iterator(value.partition, context),
- carbonLoadModel,
- context)
- }
- def initialize: Unit = {
- TaskContextUtil.setTaskContext(context)
- }
-}
-/**
- * This class wrap Scala's Iterator to Java's Iterator.
- * It also convert all columns to string data to use csv data loading flow.
- *
- * @param rddIter
- * @param carbonLoadModel
- * @param context
- */
-class RddIterator(rddIter: Iterator[Row],
- carbonLoadModel: CarbonLoadModel,
- context: TaskContext) extends JavaRddIterator[Array[String]] {
-
- val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
- .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
- val format = new SimpleDateFormat(formatString)
- val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
- val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
- val serializationNullFormat =
- carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
- def hasNext: Boolean = rddIter.hasNext
-
- def next: Array[String] = {
- val row = rddIter.next()
- val columns = new Array[String](row.length)
- for (i <- 0 until columns.length) {
- columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat,
- delimiterLevel1, delimiterLevel2, format)
- }
- columns
- }
-
- def initialize: Unit = {
- TaskContextUtil.setTaskContext(context)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index c30ead7..a750493 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable.ListBuffer
import scala.util.Random
import scala.util.control.Breaks._
+import com.databricks.spark.csv.newapi.CarbonTextFile
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
@@ -32,9 +33,9 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.spark.{SparkContext, SparkEnv, SparkException}
import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer}
import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
-import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel, CompactionModel, Partitioner}
+import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel, CompactionModel}
import org.apache.spark.sql.hive.DistributionUtil
-import org.apache.spark.util.{FileUtils, SplitUtils}
+import org.apache.spark.util.SparkUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
@@ -43,7 +44,6 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.integration.spark.merger.{CarbonCompactionUtil, CompactionCallable, CompactionType}
import org.apache.carbondata.lcm.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.lcm.status.SegmentStatusManager
import org.apache.carbondata.processing.etl.DataLoadingException
@@ -51,11 +51,10 @@ import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
import org.apache.carbondata.spark._
import org.apache.carbondata.spark.load._
-import org.apache.carbondata.spark.merger.CarbonDataMergerUtil
+import org.apache.carbondata.spark.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionCallable, CompactionType}
import org.apache.carbondata.spark.partition.api.Partition
import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.{CarbonQueryUtil, LoadMetadataUtil}
-
+import org.apache.carbondata.spark.util.{CarbonQueryUtil, CommonUtil, LoadMetadataUtil}
/**
* This is the factory class which can create different RDD depends on user needs.
@@ -164,30 +163,6 @@ object CarbonDataRDDFactory {
}
}
- def configSplitMaxSize(context: SparkContext, filePaths: String,
- hadoopConfiguration: Configuration): Unit = {
- val defaultParallelism = if (context.defaultParallelism < 1) {
- 1
- } else {
- context.defaultParallelism
- }
- val spaceConsumed = FileUtils.getSpaceOccupied(filePaths)
- val blockSize =
- hadoopConfiguration.getLongBytes("dfs.blocksize", CarbonCommonConstants.CARBON_256MB)
- LOGGER.info("[Block Distribution]")
- // calculate new block size to allow use all the parallelism
- if (spaceConsumed < defaultParallelism * blockSize) {
- var newSplitSize: Long = spaceConsumed / defaultParallelism
- if (newSplitSize < CarbonCommonConstants.CARBON_16MB) {
- newSplitSize = CarbonCommonConstants.CARBON_16MB
- }
- hadoopConfiguration.set(FileInputFormat.SPLIT_MAXSIZE, newSplitSize.toString)
- LOGGER.info(s"totalInputSpaceConsumed: $spaceConsumed , " +
- s"defaultParallelism: $defaultParallelism")
- LOGGER.info(s"mapreduce.input.fileinputformat.split.maxsize: ${ newSplitSize.toString }")
- }
- }
-
def alterTableForCompaction(sqlContext: SQLContext,
alterTableModel: AlterTableModel,
carbonLoadModel: CarbonLoadModel,
@@ -206,11 +181,11 @@ object CarbonDataRDDFactory {
LOGGER.audit(s"Compaction request received for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
+ val tableCreationTime = CarbonEnv.get.carbonMetastore
.getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
if (null == carbonLoadModel.getLoadMetadataDetails) {
- readLoadMetadataDetails(carbonLoadModel, storePath)
+ CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
}
// reading the start time of data load.
val loadStartTime = CarbonLoaderUtil.readCurrentTime()
@@ -384,7 +359,7 @@ object CarbonDataRDDFactory {
// scan again and determine if anything is there to merge again.
- readLoadMetadataDetails(carbonLoadModel, storePath)
+ CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
segList = carbonLoadModel.getLoadMetadataDetails
// in case of major compaction we will scan only once and come out as it will keep
// on doing major for the new loads also.
@@ -452,7 +427,7 @@ object CarbonDataRDDFactory {
compactionLock: ICarbonLock): Unit = {
val executor: ExecutorService = Executors.newFixedThreadPool(1)
// update the updated table status.
- readLoadMetadataDetails(carbonLoadModel, storePath)
+ CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
var segList: util.List[LoadMetadataDetails] = carbonLoadModel.getLoadMetadataDetails
// clean up of the stale segments.
@@ -493,9 +468,8 @@ object CarbonDataRDDFactory {
LOGGER.info("System level compaction lock is enabled.")
val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
var tableForCompaction = CarbonCompactionUtil
- .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
- .tablesMeta.toArray, skipCompactionTables.toList.asJava
- )
+ .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata.tablesMeta.toArray,
+ skipCompactionTables.toList.asJava)
while (null != tableForCompaction) {
LOGGER.info("Compaction request has been identified for table " +
s"${ tableForCompaction.carbonTable.getDatabaseName }." +
@@ -506,7 +480,7 @@ object CarbonDataRDDFactory {
val newCarbonLoadModel = new CarbonLoadModel()
prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
- val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
+ val tableCreationTime = CarbonEnv.get.carbonMetastore
.getTableCreationTime(newCarbonLoadModel.getDatabaseName,
newCarbonLoadModel.getTableName
)
@@ -548,7 +522,7 @@ object CarbonDataRDDFactory {
}
// ********* check again for all the tables.
tableForCompaction = CarbonCompactionUtil
- .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
+ .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata
.tablesMeta.toArray, skipCompactionTables.asJava
)
}
@@ -581,7 +555,7 @@ object CarbonDataRDDFactory {
newCarbonLoadModel.setTableName(table.getCarbonTableIdentifier.getTableName)
newCarbonLoadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName)
newCarbonLoadModel.setStorePath(table.getStorePath)
- readLoadMetadataDetails(newCarbonLoadModel, storePath)
+ CommonUtil.readLoadMetadataDetails(newCarbonLoadModel, storePath)
val loadStartTime = CarbonLoaderUtil.readCurrentTime()
newCarbonLoadModel.setFactTimeStamp(loadStartTime)
}
@@ -604,7 +578,6 @@ object CarbonDataRDDFactory {
carbonLoadModel: CarbonLoadModel,
storePath: String,
kettleHomePath: String,
- partitioner: Partitioner,
columinar: Boolean,
partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS,
useKettle: Boolean,
@@ -703,7 +676,7 @@ object CarbonDataRDDFactory {
deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, storePath,
isForceDeletion = false)
if (null == carbonLoadModel.getLoadMetadataDetails) {
- readLoadMetadataDetails(carbonLoadModel, storePath)
+ CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
}
var currentLoadCount = -1
@@ -741,9 +714,9 @@ object CarbonDataRDDFactory {
// reading the start time of data load.
val loadStartTime = CarbonLoaderUtil.readCurrentTime()
carbonLoadModel.setFactTimeStamp(loadStartTime)
- val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
+ val tableCreationTime = CarbonEnv.get.carbonMetastore
.getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
- val schemaLastUpdatedTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
+ val schemaLastUpdatedTime = CarbonEnv.get.carbonMetastore
.getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
// get partition way from configuration
@@ -776,7 +749,7 @@ object CarbonDataRDDFactory {
if (pathBuilder.nonEmpty) {
pathBuilder.substring(0, pathBuilder.size - 1)
}
- (split.getPartition.getUniqueID, SplitUtils.getSplits(pathBuilder.toString(),
+ (split.getPartition.getUniqueID, SparkUtil.getSplits(pathBuilder.toString(),
sqlContext.sparkContext
))
}
@@ -795,7 +768,7 @@ object CarbonDataRDDFactory {
}
pathBuilder.append(split.getPartition.getUniqueID).append("/")
(split.getPartition.getUniqueID,
- SplitUtils.getSplits(pathBuilder.toString, sqlContext.sparkContext))
+ SparkUtil.getSplits(pathBuilder.toString, sqlContext.sparkContext))
}
}
} else {
@@ -817,7 +790,7 @@ object CarbonDataRDDFactory {
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)
- configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration)
+ CarbonTextFile.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration)
val inputFormat = new org.apache.hadoop.mapreduce.lib.input.TextInputFormat
inputFormat match {
@@ -883,7 +856,6 @@ object CarbonDataRDDFactory {
carbonLoadModel,
storePath,
kettleHomePath,
- partitioner,
columinar,
currentLoadCount,
tableCreationTime,
@@ -895,7 +867,6 @@ object CarbonDataRDDFactory {
status = new NewCarbonDataLoadRDD(sqlContext.sparkContext,
new DataLoadResultImpl(),
carbonLoadModel,
- partitioner,
currentLoadCount,
blocksGroupBy,
isTableSplitPartition).collect()
@@ -930,8 +901,7 @@ object CarbonDataRDDFactory {
}
CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
- carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName,
- partitioner.partitionCount, currentLoadCount.toString)
+ carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName, currentLoadCount.toString)
var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
var errorMessage: String = "DataLoad failure"
var executorMessage: String = ""
@@ -1034,17 +1004,11 @@ object CarbonDataRDDFactory {
}
- def readLoadMetadataDetails(model: CarbonLoadModel, storePath: String): Unit = {
- val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath
- val details = SegmentStatusManager.readLoadMetadata(metadataPath)
- model.setLoadMetadataDetails(details.toList.asJava)
- }
-
def deleteLoadsAndUpdateMetadata(
carbonLoadModel: CarbonLoadModel,
table: CarbonTable,
storePath: String,
- isForceDeletion: Boolean) {
+ isForceDeletion: Boolean): Unit = {
if (LoadMetadataUtil.isLoadDeletionRequired(carbonLoadModel)) {
val loadMetadataFilePath = CarbonLoaderUtil
.extractLoadMetadataFileLocation(carbonLoadModel)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
deleted file mode 100644
index 343a602..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.Partitioner
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.load.LoadMetadataDetails
-import org.apache.carbondata.spark.DeletedLoadResult
-import org.apache.carbondata.spark.load.DeletedLoadMetadata
-import org.apache.carbondata.spark.util.CarbonQueryUtil
-
-class CarbonDeleteLoadByDateRDD[K, V](
- sc: SparkContext,
- result: DeletedLoadResult[K, V],
- databaseName: String,
- tableName: String,
- dateField: String,
- dateFieldActualName: String,
- dateValue: String,
- factTableName: String,
- dimTableName: String,
- storePath: String,
- loadMetadataDetails: List[LoadMetadataDetails])
- extends RDD[(K, V)](sc, Nil) {
-
- sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
- override def getPartitions: Array[Partition] = {
- val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
- splits.zipWithIndex.map {s =>
- new CarbonLoadPartition(id, s._2, s._1)
- }
- }
-
- override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
- new Iterator[(K, V)] {
- val deletedMetaData = new DeletedLoadMetadata()
- val split = theSplit.asInstanceOf[CarbonLoadPartition]
- logInfo("Input split: " + split.serializableHadoopSplit.value)
-
- logInfo("Input split: " + split.serializableHadoopSplit.value)
- val partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
-
- // TODO call CARBON delete API
- logInfo("Applying data retention as per date value " + dateValue)
- var dateFormat = ""
- try {
- dateFormat = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
- } catch {
- case e: Exception => logInfo("Unable to parse with default time format " + dateValue)
- }
- // TODO: Implement it
- var finished = false
-
- override def hasNext: Boolean = {
- finished
- }
-
- override def next(): (K, V) = {
- result.getKey(null, null)
- }
- }
- }
-
- override def getPreferredLocations(split: Partition): Seq[String] = {
- val theSplit = split.asInstanceOf[CarbonLoadPartition]
- val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
- logInfo("Host Name: " + s.head + s.length)
- s
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
deleted file mode 100644
index 26e1abc..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.Partitioner
-
-import org.apache.carbondata.spark.Value
-import org.apache.carbondata.spark.util.CarbonQueryUtil
-
-class CarbonDeleteLoadRDD[V: ClassTag](
- sc: SparkContext,
- valueClass: Value[V],
- loadId: Int,
- databaseName: String,
- tableName: String,
- partitioner: Partitioner)
- extends RDD[V](sc, Nil) {
- sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
- override def getPartitions: Array[Partition] = {
- val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
- splits.zipWithIndex.map {f =>
- new CarbonLoadPartition(id, f._2, f._1)
- }
- }
-
- override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
- val iter = new Iterator[V] {
- val split = theSplit.asInstanceOf[CarbonLoadPartition]
- logInfo("Input split: " + split.serializableHadoopSplit.value)
- // TODO call CARBON delete API
-
- var havePair = false
- var finished = false
-
- override def hasNext: Boolean = {
- if (!finished && !havePair) {
- finished = true
- havePair = !finished
- }
- !finished
- }
-
- override def next(): V = {
- if (!hasNext) {
- throw new java.util.NoSuchElementException("End of stream")
- }
- havePair = false
- valueClass.getValue(null)
- }
-
- }
- logInfo("********Deleting***************")
- iter
- }
-
- override def getPreferredLocations(split: Partition): Seq[String] = {
- val theSplit = split.asInstanceOf[CarbonLoadPartition]
- val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
- logInfo("Host Name: " + s.head + s.length)
- s
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
deleted file mode 100644
index 47689bd..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.Partitioner
-
-import org.apache.carbondata.spark.Value
-import org.apache.carbondata.spark.util.CarbonQueryUtil
-
-class CarbonDropTableRDD[V: ClassTag](
- sc: SparkContext,
- valueClass: Value[V],
- databaseName: String,
- tableName: String)
- extends RDD[V](sc, Nil) {
-
- sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
- override def getPartitions: Array[Partition] = {
- val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
- splits.zipWithIndex.map { s =>
- new CarbonLoadPartition(id, s._2, s._1)
- }
- }
-
- override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
-
- val iter = new Iterator[V] {
- // TODO: Clear Btree from memory
-
- var havePair = false
- var finished = false
-
- override def hasNext: Boolean = {
- if (!finished && !havePair) {
- finished = true
- havePair = !finished
- }
- !finished
- }
-
- override def next(): V = {
- if (!hasNext) {
- throw new java.util.NoSuchElementException("End of stream")
- }
- havePair = false
- valueClass.getValue(null)
- }
- }
- iter
- }
-}
-