You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/11/01 18:17:39 UTC
spark git commit: [SPARK-18103][FOLLOW-UP][SQL][MINOR] Rename
`MetadataLogFileCatalog` to `MetadataLogFileIndex`
Repository: spark
Updated Branches:
refs/heads/master 8ac09108f -> 8cdf143f4
[SPARK-18103][FOLLOW-UP][SQL][MINOR] Rename `MetadataLogFileCatalog` to `MetadataLogFileIndex`
## What changes were proposed in this pull request?
This is a follow-up to https://github.com/apache/spark/pull/15634.
## How was this patch tested?
N/A
Author: Liwei Lin <lw...@gmail.com>
Closes #15712 from lw-lin/18103.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8cdf143f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8cdf143f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8cdf143f
Branch: refs/heads/master
Commit: 8cdf143f4b1ca5c6bc0256808e6f42d9ef299cbd
Parents: 8ac0910
Author: Liwei Lin <lw...@gmail.com>
Authored: Tue Nov 1 11:17:35 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Nov 1 11:17:35 2016 -0700
----------------------------------------------------------------------
.../streaming/MetadataLogFileCatalog.scala | 60 --------------------
.../streaming/MetadataLogFileIndex.scala | 60 ++++++++++++++++++++
2 files changed, 60 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8cdf143f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
deleted file mode 100644
index aeaa134..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import scala.collection.mutable
-
-import org.apache.hadoop.fs.{FileStatus, Path}
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.execution.datasources._
-
-
-/**
- * A [[FileIndex]] that generates the list of files to processing by reading them from the
- * metadata log files generated by the [[FileStreamSink]].
- */
-class MetadataLogFileIndex(sparkSession: SparkSession, path: Path)
- extends PartitioningAwareFileIndex(sparkSession, Map.empty, None) {
-
- private val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
- logInfo(s"Reading streaming file log from $metadataDirectory")
- private val metadataLog =
- new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, metadataDirectory.toUri.toString)
- private val allFilesFromLog = metadataLog.allFiles().map(_.toFileStatus).filterNot(_.isDirectory)
- private var cachedPartitionSpec: PartitionSpec = _
-
- override protected val leafFiles: mutable.LinkedHashMap[Path, FileStatus] = {
- new mutable.LinkedHashMap ++= allFilesFromLog.map(f => f.getPath -> f)
- }
-
- override protected val leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = {
- allFilesFromLog.toArray.groupBy(_.getPath.getParent)
- }
-
- override def rootPaths: Seq[Path] = path :: Nil
-
- override def refresh(): Unit = { }
-
- override def partitionSpec(): PartitionSpec = {
- if (cachedPartitionSpec == null) {
- cachedPartitionSpec = inferPartitioning()
- }
- cachedPartitionSpec
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/8cdf143f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
new file mode 100644
index 0000000..aeaa134
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources._
+
+
+/**
+ * A [[FileIndex]] that generates the list of files to processing by reading them from the
+ * metadata log files generated by the [[FileStreamSink]].
+ */
+class MetadataLogFileIndex(sparkSession: SparkSession, path: Path)
+ extends PartitioningAwareFileIndex(sparkSession, Map.empty, None) {
+
+ private val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
+ logInfo(s"Reading streaming file log from $metadataDirectory")
+ private val metadataLog =
+ new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, metadataDirectory.toUri.toString)
+ private val allFilesFromLog = metadataLog.allFiles().map(_.toFileStatus).filterNot(_.isDirectory)
+ private var cachedPartitionSpec: PartitionSpec = _
+
+ override protected val leafFiles: mutable.LinkedHashMap[Path, FileStatus] = {
+ new mutable.LinkedHashMap ++= allFilesFromLog.map(f => f.getPath -> f)
+ }
+
+ override protected val leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = {
+ allFilesFromLog.toArray.groupBy(_.getPath.getParent)
+ }
+
+ override def rootPaths: Seq[Path] = path :: Nil
+
+ override def refresh(): Unit = { }
+
+ override def partitionSpec(): PartitionSpec = {
+ if (cachedPartitionSpec == null) {
+ cachedPartitionSpec = inferPartitioning()
+ }
+ cachedPartitionSpec
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org