You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/07/15 13:04:58 UTC

[GitHub] [hudi] vinothchandar commented on a change in pull request #1702: Bootstrap datasource changes

vinothchandar commented on a change in pull request #1702:
URL: https://github.com/apache/hudi/pull/1702#discussion_r444571139



##########
File path: hudi-client/pom.xml
##########
@@ -101,6 +101,11 @@
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-sql_${scala.binary.version}</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-avro_${scala.binary.version}</artifactId>

Review comment:
       is it possible to keep this to `hudi-spark` ?

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiBootstrapRDD.scala
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+class HudiBootstrapRDD(@transient spark: SparkSession,
+                       dataReadFunction: PartitionedFile => Iterator[Any],
+                       skeletonReadFunction: PartitionedFile => Iterator[Any],
+                       regularReadFunction: PartitionedFile => Iterator[Any],
+                       dataSchema: StructType,
+                       skeletonSchema: StructType,
+                       requiredColumns: Array[String],
+                       tableState: HudiBootstrapTableState)
+  extends RDD[InternalRow](spark.sparkContext, Nil) {
+
+  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+    val bootstrapPartition = split.asInstanceOf[HudiBootstrapPartition]
+
+    if (log.isDebugEnabled) {
+      if (bootstrapPartition.split.skeletonFile.isDefined) {
+        logDebug("Got Split => Index: " + bootstrapPartition.index + ", Data File: "
+          + bootstrapPartition.split.dataFile.filePath + ", Skeleton File: "
+          + bootstrapPartition.split.skeletonFile.get.filePath)
+      } else {
+        logDebug("Got Split => Index: " + bootstrapPartition.index + ", Data File: "
+          + bootstrapPartition.split.dataFile.filePath)
+      }
+    }
+
+    var partitionedFileIterator: Iterator[InternalRow] = null
+
+    if (bootstrapPartition.split.skeletonFile.isDefined) {
+      // It is a bootstrap split. Check both skeleton and data files.
+      if (dataSchema.isEmpty) {
+        // No data column to fetch, hence fetch only from skeleton file
+        partitionedFileIterator = read(bootstrapPartition.split.skeletonFile.get,  skeletonReadFunction)
+      } else if (skeletonSchema.isEmpty) {
+        // No metadata column to fetch, hence fetch only from data file
+        partitionedFileIterator = read(bootstrapPartition.split.dataFile, dataReadFunction)
+      } else {
+        // Fetch from both data and skeleton file, and merge
+        val dataFileIterator = read(bootstrapPartition.split.dataFile, dataReadFunction)
+        val skeletonFileIterator = read(bootstrapPartition.split.skeletonFile.get, skeletonReadFunction)
+        partitionedFileIterator = merge(skeletonFileIterator, dataFileIterator)
+      }
+    } else {
+      partitionedFileIterator = read(bootstrapPartition.split.dataFile, regularReadFunction)
+    }
+    partitionedFileIterator
+  }
+
+  def merge(skeletonFileIterator: Iterator[InternalRow], dataFileIterator: Iterator[InternalRow])
+  : Iterator[InternalRow] = {
+    new Iterator[InternalRow] {
+      override def hasNext: Boolean = dataFileIterator.hasNext && skeletonFileIterator.hasNext
+      override def next(): InternalRow = {
+        mergeInternalRow(skeletonFileIterator.next(), dataFileIterator.next())
+      }
+    }
+  }
+
+  def mergeInternalRow(skeletonRow: InternalRow, dataRow: InternalRow): InternalRow = {

Review comment:
       on avoiding the merge cost, my understanding is - its hard for this case where you need to actually merge these two values, re-order etc.  

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiSparkUtils.scala
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import scala.collection.JavaConverters._
+
+
+object HudiSparkUtils {
+
+  def getHudiMetadataSchema: StructType = {
+    StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
+        StructField(col, StringType, nullable = true)
+    }))
+  }
+
+  def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): Seq[Path] = {
+    paths.flatMap(path => {
+      val qualified = new Path(path).makeQualified(fs.getUri, fs.getWorkingDirectory)
+      val globPaths = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
+      globPaths
+    })
+  }
+
+  def createInMemoryFileIndex(sparkSession: SparkSession, globbedPaths: Seq[Path]): InMemoryFileIndex = {

Review comment:
       some of this stuff is good to do, even in the regular path? in place of the current path filter approach?

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiBootstrapRDD.scala
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+class HudiBootstrapRDD(@transient spark: SparkSession,
+                       dataReadFunction: PartitionedFile => Iterator[Any],
+                       skeletonReadFunction: PartitionedFile => Iterator[Any],
+                       regularReadFunction: PartitionedFile => Iterator[Any],
+                       dataSchema: StructType,
+                       skeletonSchema: StructType,
+                       requiredColumns: Array[String],
+                       tableState: HudiBootstrapTableState)
+  extends RDD[InternalRow](spark.sparkContext, Nil) {
+
+  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+    val bootstrapPartition = split.asInstanceOf[HudiBootstrapPartition]
+
+    if (log.isDebugEnabled) {
+      if (bootstrapPartition.split.skeletonFile.isDefined) {
+        logDebug("Got Split => Index: " + bootstrapPartition.index + ", Data File: "
+          + bootstrapPartition.split.dataFile.filePath + ", Skeleton File: "
+          + bootstrapPartition.split.skeletonFile.get.filePath)
+      } else {
+        logDebug("Got Split => Index: " + bootstrapPartition.index + ", Data File: "
+          + bootstrapPartition.split.dataFile.filePath)
+      }
+    }
+
+    var partitionedFileIterator: Iterator[InternalRow] = null
+
+    if (bootstrapPartition.split.skeletonFile.isDefined) {
+      // It is a bootstrap split. Check both skeleton and data files.
+      if (dataSchema.isEmpty) {
+        // No data column to fetch, hence fetch only from skeleton file
+        partitionedFileIterator = read(bootstrapPartition.split.skeletonFile.get,  skeletonReadFunction)
+      } else if (skeletonSchema.isEmpty) {
+        // No metadata column to fetch, hence fetch only from data file
+        partitionedFileIterator = read(bootstrapPartition.split.dataFile, dataReadFunction)
+      } else {
+        // Fetch from both data and skeleton file, and merge
+        val dataFileIterator = read(bootstrapPartition.split.dataFile, dataReadFunction)
+        val skeletonFileIterator = read(bootstrapPartition.split.skeletonFile.get, skeletonReadFunction)
+        partitionedFileIterator = merge(skeletonFileIterator, dataFileIterator)
+      }
+    } else {
+      partitionedFileIterator = read(bootstrapPartition.split.dataFile, regularReadFunction)
+    }
+    partitionedFileIterator
+  }
+
+  def merge(skeletonFileIterator: Iterator[InternalRow], dataFileIterator: Iterator[InternalRow])

Review comment:
       +1 here we wrap the FileFormat. Seems much simpler.. Thanks @umehrot2 for this.. educative! 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiBootstrapRDD.scala
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+class HudiBootstrapRDD(@transient spark: SparkSession,
+                       dataReadFunction: PartitionedFile => Iterator[Any],
+                       skeletonReadFunction: PartitionedFile => Iterator[Any],
+                       regularReadFunction: PartitionedFile => Iterator[Any],
+                       dataSchema: StructType,
+                       skeletonSchema: StructType,
+                       requiredColumns: Array[String],
+                       tableState: HudiBootstrapTableState)
+  extends RDD[InternalRow](spark.sparkContext, Nil) {
+
+  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+    val bootstrapPartition = split.asInstanceOf[HudiBootstrapPartition]
+
+    if (log.isDebugEnabled) {
+      if (bootstrapPartition.split.skeletonFile.isDefined) {
+        logDebug("Got Split => Index: " + bootstrapPartition.index + ", Data File: "
+          + bootstrapPartition.split.dataFile.filePath + ", Skeleton File: "
+          + bootstrapPartition.split.skeletonFile.get.filePath)
+      } else {
+        logDebug("Got Split => Index: " + bootstrapPartition.index + ", Data File: "
+          + bootstrapPartition.split.dataFile.filePath)
+      }
+    }
+
+    var partitionedFileIterator: Iterator[InternalRow] = null
+
+    if (bootstrapPartition.split.skeletonFile.isDefined) {
+      // It is a bootstrap split. Check both skeleton and data files.
+      if (dataSchema.isEmpty) {
+        // No data column to fetch, hence fetch only from skeleton file
+        partitionedFileIterator = read(bootstrapPartition.split.skeletonFile.get,  skeletonReadFunction)
+      } else if (skeletonSchema.isEmpty) {
+        // No metadata column to fetch, hence fetch only from data file
+        partitionedFileIterator = read(bootstrapPartition.split.dataFile, dataReadFunction)
+      } else {
+        // Fetch from both data and skeleton file, and merge
+        val dataFileIterator = read(bootstrapPartition.split.dataFile, dataReadFunction)
+        val skeletonFileIterator = read(bootstrapPartition.split.skeletonFile.get, skeletonReadFunction)
+        partitionedFileIterator = merge(skeletonFileIterator, dataFileIterator)
+      }
+    } else {
+      partitionedFileIterator = read(bootstrapPartition.split.dataFile, regularReadFunction)
+    }
+    partitionedFileIterator
+  }
+
+  def merge(skeletonFileIterator: Iterator[InternalRow], dataFileIterator: Iterator[InternalRow])
+  : Iterator[InternalRow] = {
+    new Iterator[InternalRow] {
+      override def hasNext: Boolean = dataFileIterator.hasNext && skeletonFileIterator.hasNext
+      override def next(): InternalRow = {
+        mergeInternalRow(skeletonFileIterator.next(), dataFileIterator.next())
+      }
+    }
+  }
+
+  def mergeInternalRow(skeletonRow: InternalRow, dataRow: InternalRow): InternalRow = {
+    val skeletonArr  = skeletonRow.copy().toSeq(skeletonSchema)
+    val dataArr = dataRow.copy().toSeq(dataSchema)
+    // We need to return it in the order requested
+    val mergedArr = requiredColumns.map(col => {
+      if (skeletonSchema.fieldNames.contains(col)) {
+        val idx = skeletonSchema.fieldIndex(col)
+        skeletonArr(idx)
+      } else {
+        val idx = dataSchema.fieldIndex(col)
+        dataArr(idx)
+      }
+    })
+
+    logDebug("Merged data and skeleton values => " + mergedArr.mkString(","))
+    val mergedRow = InternalRow.fromSeq(mergedArr)
+    mergedRow
+  }
+
+  def read(partitionedFile: PartitionedFile, readFileFunction: PartitionedFile => Iterator[Any])
+    : Iterator[InternalRow] = {
+    val fileIterator = readFileFunction(partitionedFile)
+
+    import scala.collection.JavaConverters._
+
+    val rows = fileIterator.flatMap(_ match {
+      case r: InternalRow => Seq(r)
+      case b: ColumnarBatch => b.rowIterator().asScala

Review comment:
       For MOR table, I have some ideas to speed things up by pre-reading the delete/rollback blocks and simply "skip" rows as long as `OverwritewithLatestPayload` is used..  If the user does specify a merge function, then its hard to get away from.. we can take this discussion in a separate forum. 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiBootstrapRelation.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+/**
+  * This is Spark relation that can be used for querying metadata/fully bootstrapped query hudi tables, as well as
+  * non-bootstrapped tables. It implements PrunedFilteredScan interface in order to support column pruning and filter
+  * push-down. For metadata bootstrapped files, if we query columns from both metadata and actual data then it will
+  * perform a merge of both to return the result.
+  *
+  * Caveat: Filter push-down does not work when querying both metadata and actual data columns over metadata
+  * bootstrapped files, because then the metadata file and data file can return different number of rows causing errors
+  * merging.
+  *
+  * @param _sqlContext Spark SQL Context
+  * @param userSchema User specified schema in the datasource query
+  * @param globPaths Globbed paths obtained from the user provided path for querying
+  * @param metaClient Hudi table meta client
+  * @param optParams DataSource options passed by the user
+  */
+class HudiBootstrapRelation(@transient val _sqlContext: SQLContext,
+                            val userSchema: StructType,
+                            val globPaths: Seq[Path],
+                            val metaClient: HoodieTableMetaClient,
+                            val optParams: Map[String, String]) extends BaseRelation
+  with PrunedFilteredScan with Logging {
+
+  val skeletonSchema: StructType = HudiSparkUtils.getHudiMetadataSchema
+  var dataSchema: StructType = _
+  var fullSchema: StructType = _
+
+  val fileIndex: HudiBootstrapFileIndex = buildFileIndex()
+
+  override def sqlContext: SQLContext = _sqlContext
+
+  override val needConversion: Boolean = false
+
+  override def schema: StructType = inferFullSchema()
+
+  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+    logInfo("Starting scan..")
+
+    // Compute splits
+    val bootstrapSplits = fileIndex.files.map(hoodieBaseFile => {
+      var skeletonFile: Option[PartitionedFile] = Option.empty
+      var dataFile: PartitionedFile = null
+
+      if (hoodieBaseFile.getExternalBaseFile.isPresent) {
+        skeletonFile = Option(PartitionedFile(InternalRow.empty, hoodieBaseFile.getPath, 0, hoodieBaseFile.getFileLen))
+        dataFile = PartitionedFile(InternalRow.empty, hoodieBaseFile.getExternalBaseFile.get().getPath, 0,
+          hoodieBaseFile.getExternalBaseFile.get().getFileLen)
+      } else {
+        dataFile = PartitionedFile(InternalRow.empty, hoodieBaseFile.getPath, 0, hoodieBaseFile.getFileLen)
+      }
+      HudiBootstrapSplit(dataFile, skeletonFile)
+    })
+    val tableState = HudiBootstrapTableState(bootstrapSplits)
+
+    // Get required schemas for column pruning
+    var requiredDataSchema = StructType(Seq())
+    var requiredSkeletonSchema = StructType(Seq())
+    requiredColumns.foreach(col => {
+      var field = dataSchema.find(_.name == col)
+      if (field.isDefined) {
+        requiredDataSchema = requiredDataSchema.add(field.get)
+      } else {
+        field = skeletonSchema.find(_.name == col)
+        requiredSkeletonSchema = requiredSkeletonSchema.add(field.get)
+      }
+    })
+
+    // Prepare readers for reading data file and skeleton files
+    val dataReadFunction = new ParquetFileFormat()
+        .buildReaderWithPartitionValues(
+          sparkSession = _sqlContext.sparkSession,
+          dataSchema = dataSchema,
+          partitionSchema = StructType(Seq.empty),
+          requiredSchema = requiredDataSchema,
+          filters = if (requiredSkeletonSchema.isEmpty) filters else Seq() ,
+          options = Map.empty,
+          hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
+        )
+
+    val skeletonReadFunction = new ParquetFileFormat()
+      .buildReaderWithPartitionValues(
+        sparkSession = _sqlContext.sparkSession,
+        dataSchema = skeletonSchema,
+        partitionSchema = StructType(Seq.empty),
+        requiredSchema = requiredSkeletonSchema,
+        filters = if (requiredDataSchema.isEmpty) filters else Seq(),
+        options = Map.empty,
+        hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
+      )
+
+    val regularReadFunction = new ParquetFileFormat()
+      .buildReaderWithPartitionValues(
+        sparkSession = _sqlContext.sparkSession,
+        dataSchema = fullSchema,
+        partitionSchema = StructType(Seq.empty),
+        requiredSchema = StructType(requiredSkeletonSchema.fields ++ requiredDataSchema.fields),
+        filters = filters,
+        options = Map.empty,
+        hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf())
+
+    val rdd = new HudiBootstrapRDD(_sqlContext.sparkSession, dataReadFunction, skeletonReadFunction,
+      regularReadFunction, requiredDataSchema, requiredSkeletonSchema, requiredColumns, tableState)
+    rdd.asInstanceOf[RDD[Row]]
+  }
+
+  def inferFullSchema(): StructType = {
+    if (fullSchema == null) {
+      logInfo("Inferring schema..")
+      val schemaResolver = new TableSchemaResolver(metaClient)
+      val tableSchema = schemaResolver.getTableAvroSchemaWithoutMetadataFields
+      dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+      fullSchema = StructType(skeletonSchema.fields ++ dataSchema.fields)
+    }
+    fullSchema
+  }
+
+  def buildFileIndex(): HudiBootstrapFileIndex = {
+    logInfo("Building file index..")
+    val inMemoryFileIndex = HudiSparkUtils.createInMemoryFileIndex(_sqlContext.sparkSession, globPaths)
+    val fileStatuses = inMemoryFileIndex.allFiles()

Review comment:
       IIUC this will leverage Spark's caching and filter on top of that based on our timeline




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org