You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/11/01 18:17:39 UTC

spark git commit: [SPARK-18103][FOLLOW-UP][SQL][MINOR] Rename `MetadataLogFileCatalog` to `MetadataLogFileIndex`

Repository: spark
Updated Branches:
  refs/heads/master 8ac09108f -> 8cdf143f4


[SPARK-18103][FOLLOW-UP][SQL][MINOR] Rename `MetadataLogFileCatalog` to `MetadataLogFileIndex`

## What changes were proposed in this pull request?

This is a follow-up to https://github.com/apache/spark/pull/15634.

## How was this patch tested?

N/A

Author: Liwei Lin <lw...@gmail.com>

Closes #15712 from lw-lin/18103.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8cdf143f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8cdf143f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8cdf143f

Branch: refs/heads/master
Commit: 8cdf143f4b1ca5c6bc0256808e6f42d9ef299cbd
Parents: 8ac0910
Author: Liwei Lin <lw...@gmail.com>
Authored: Tue Nov 1 11:17:35 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Nov 1 11:17:35 2016 -0700

----------------------------------------------------------------------
 .../streaming/MetadataLogFileCatalog.scala      | 60 --------------------
 .../streaming/MetadataLogFileIndex.scala        | 60 ++++++++++++++++++++
 2 files changed, 60 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8cdf143f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
deleted file mode 100644
index aeaa134..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import scala.collection.mutable
-
-import org.apache.hadoop.fs.{FileStatus, Path}
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.execution.datasources._
-
-
-/**
- * A [[FileIndex]] that generates the list of files to processing by reading them from the
- * metadata log files generated by the [[FileStreamSink]].
- */
-class MetadataLogFileIndex(sparkSession: SparkSession, path: Path)
-  extends PartitioningAwareFileIndex(sparkSession, Map.empty, None) {
-
-  private val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
-  logInfo(s"Reading streaming file log from $metadataDirectory")
-  private val metadataLog =
-    new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, metadataDirectory.toUri.toString)
-  private val allFilesFromLog = metadataLog.allFiles().map(_.toFileStatus).filterNot(_.isDirectory)
-  private var cachedPartitionSpec: PartitionSpec = _
-
-  override protected val leafFiles: mutable.LinkedHashMap[Path, FileStatus] = {
-    new mutable.LinkedHashMap ++= allFilesFromLog.map(f => f.getPath -> f)
-  }
-
-  override protected val leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = {
-    allFilesFromLog.toArray.groupBy(_.getPath.getParent)
-  }
-
-  override def rootPaths: Seq[Path] = path :: Nil
-
-  override def refresh(): Unit = { }
-
-  override def partitionSpec(): PartitionSpec = {
-    if (cachedPartitionSpec == null) {
-      cachedPartitionSpec = inferPartitioning()
-    }
-    cachedPartitionSpec
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8cdf143f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
new file mode 100644
index 0000000..aeaa134
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources._
+
+
+/**
+ * A [[FileIndex]] that generates the list of files to processing by reading them from the
+ * metadata log files generated by the [[FileStreamSink]].
+ */
+class MetadataLogFileIndex(sparkSession: SparkSession, path: Path)
+  extends PartitioningAwareFileIndex(sparkSession, Map.empty, None) {
+
+  private val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
+  logInfo(s"Reading streaming file log from $metadataDirectory")
+  private val metadataLog =
+    new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, metadataDirectory.toUri.toString)
+  private val allFilesFromLog = metadataLog.allFiles().map(_.toFileStatus).filterNot(_.isDirectory)
+  private var cachedPartitionSpec: PartitionSpec = _
+
+  override protected val leafFiles: mutable.LinkedHashMap[Path, FileStatus] = {
+    new mutable.LinkedHashMap ++= allFilesFromLog.map(f => f.getPath -> f)
+  }
+
+  override protected val leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = {
+    allFilesFromLog.toArray.groupBy(_.getPath.getParent)
+  }
+
+  override def rootPaths: Seq[Path] = path :: Nil
+
+  override def refresh(): Unit = { }
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+      cachedPartitionSpec = inferPartitioning()
+    }
+    cachedPartitionSpec
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org