You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by jb...@apache.org on 2016/06/23 14:15:56 UTC
[08/56] [abbrv] incubator-carbondata git commit: [issue-644]Update
CarbonFile to support Federation (#661)
[issue-644]Update CarbonFile to support Federation (#661)
* Update CarbonFile to support Federation
* Set DefaultFileType to VIEWFS when store-path schema is viewfs://
* fix checkstyle error
* fix instance error of CarbonFile#getParentFile return
* update #ViewFSCarbonFile follow of optimizing nn rpc request
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/25cd9e5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/25cd9e5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/25cd9e5d
Branch: refs/heads/master
Commit: 25cd9e5de13e2b9c561308be2a376ec458be5168
Parents: dbefb7b
Author: Hexiaoqiao <xq...@gmail.com>
Authored: Fri Jun 17 11:39:13 2016 +0800
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Fri Jun 17 09:09:13 2016 +0530
----------------------------------------------------------------------
.../fileperations/AtomicFileOperationsImpl.java | 3 +-
.../store/filesystem/AbstractDFSCarbonFile.java | 217 +++++++++++++++++
.../store/filesystem/HDFSCarbonFile.java | 243 +++----------------
.../store/filesystem/ViewFSCarbonFile.java | 126 ++++++++++
.../store/impl/DFSFileHolderImpl.java | 183 ++++++++++++++
.../datastorage/store/impl/FileFactory.java | 36 ++-
.../store/impl/HDFSFileHolderImpl.java | 186 --------------
.../org/carbondata/core/util/CarbonUtil.java | 23 +-
.../carbondata/hadoop/util/SchemaReader.java | 3 +-
.../csvreaderstep/BlockDataHandler.java | 3 +-
.../util/CarbonDataProcessorUtil.java | 12 +-
11 files changed, 613 insertions(+), 422 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/25cd9e5d/core/src/main/java/org/carbondata/core/datastorage/store/fileperations/AtomicFileOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/fileperations/AtomicFileOperationsImpl.java b/core/src/main/java/org/carbondata/core/datastorage/store/fileperations/AtomicFileOperationsImpl.java
index fe7a6f3..41f4580 100644
--- a/core/src/main/java/org/carbondata/core/datastorage/store/fileperations/AtomicFileOperationsImpl.java
+++ b/core/src/main/java/org/carbondata/core/datastorage/store/fileperations/AtomicFileOperationsImpl.java
@@ -77,7 +77,8 @@ public class AtomicFileOperationsImpl implements AtomicFileOperations {
CarbonFile tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
if (!tempFile.renameForce(filePath)) {
- throw new IOException("temporary file renaming failed");
+ throw new IOException("temporary file renaming failed, src="
+ + tempFile.getPath() + ", dest=" + filePath);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/25cd9e5d/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/AbstractDFSCarbonFile.java
new file mode 100644
index 0000000..b04cd47
--- /dev/null
+++ b/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/AbstractDFSCarbonFile.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.core.datastorage.store.filesystem;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.datastorage.store.impl.FileFactory;
+import org.carbondata.core.util.CarbonUtil;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public abstract class AbstractDFSCarbonFile implements CarbonFile {
+ /**
+ * LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(AbstractDFSCarbonFile.class.getName());
+ protected FileStatus fileStatus;
+ protected FileSystem fs;
+
+ public AbstractDFSCarbonFile(String filePath) {
+ filePath = filePath.replace("\\", "/");
+ Path path = new Path(filePath);
+ try {
+ fs = path.getFileSystem(FileFactory.getConfiguration());
+ fileStatus = fs.getFileStatus(path);
+ } catch (IOException e) {
+ LOGGER.error("Exception occured:" + e.getMessage());
+ }
+ }
+
+ public AbstractDFSCarbonFile(Path path) {
+ try {
+ fs = path.getFileSystem(FileFactory.getConfiguration());
+ fileStatus = fs.getFileStatus(path);
+ } catch (IOException e) {
+ LOGGER.error("Exception occured:" + e.getMessage());
+ }
+ }
+
+ public AbstractDFSCarbonFile(FileStatus fileStatus) {
+ this.fileStatus = fileStatus;
+ }
+
+ @Override public boolean createNewFile() {
+ Path path = fileStatus.getPath();
+ try {
+ return fs.createNewFile(path);
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ @Override public String getAbsolutePath() {
+ return fileStatus.getPath().toString();
+ }
+
+ @Override public String getName() {
+ return fileStatus.getPath().getName();
+ }
+
+ @Override public boolean isDirectory() {
+ return fileStatus.isDirectory();
+ }
+
+ @Override public boolean exists() {
+ try {
+ if (null != fileStatus) {
+ fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
+ return fs.exists(fileStatus.getPath());
+ }
+ } catch (IOException e) {
+ LOGGER.error("Exception occured:" + e.getMessage());
+ }
+ return false;
+ }
+
+ @Override public String getCanonicalPath() {
+ return getAbsolutePath();
+ }
+
+ @Override public String getPath() {
+ return getAbsolutePath();
+ }
+
+ @Override public long getSize() {
+ return fileStatus.getLen();
+ }
+
+ public boolean renameTo(String changetoName) {
+ FileSystem fs;
+ try {
+ fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
+ return fs.rename(fileStatus.getPath(), new Path(changetoName));
+ } catch (IOException e) {
+ LOGGER.error("Exception occured:" + e.getMessage());
+ return false;
+ }
+ }
+
+ public boolean delete() {
+ FileSystem fs;
+ try {
+ fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
+ return fs.delete(fileStatus.getPath(), true);
+ } catch (IOException e) {
+ LOGGER.error("Exception occured:" + e.getMessage());
+ return false;
+ }
+ }
+
+ @Override public long getLastModifiedTime() {
+ return fileStatus.getModificationTime();
+ }
+
+ @Override public boolean setLastModifiedTime(long timestamp) {
+ try {
+ fs.setTimes(fileStatus.getPath(), timestamp, timestamp);
+ } catch (IOException e) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * This method will delete the data in file data from a given offset
+ */
+ @Override public boolean truncate(String fileName, long validDataEndOffset) {
+ DataOutputStream dataOutputStream = null;
+ DataInputStream dataInputStream = null;
+ boolean fileTruncatedSuccessfully = false;
+ // if bytes to read less than 1024 then buffer size should be equal to the given offset
+ int bufferSize = validDataEndOffset > CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR ?
+ CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR :
+ (int) validDataEndOffset;
+ // temporary file name
+ String tempWriteFilePath = fileName + CarbonCommonConstants.TEMPWRITEFILEEXTENSION;
+ FileFactory.FileType fileType = FileFactory.getFileType(fileName);
+ try {
+ CarbonFile tempFile = null;
+ // delete temporary file if it already exists at a given path
+ if (FileFactory.isFileExist(tempWriteFilePath, fileType)) {
+ tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
+ tempFile.delete();
+ }
+ // create new temporary file
+ FileFactory.createNewFile(tempWriteFilePath, fileType);
+ tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
+ byte[] buff = new byte[bufferSize];
+ dataInputStream = FileFactory.getDataInputStream(fileName, fileType);
+ // read the data
+ int read = dataInputStream.read(buff, 0, buff.length);
+ dataOutputStream = FileFactory.getDataOutputStream(tempWriteFilePath, fileType);
+ dataOutputStream.write(buff, 0, read);
+ long remaining = validDataEndOffset - read;
+ // anytime we should not cross the offset to be read
+ while (remaining > 0) {
+ if (remaining > bufferSize) {
+ buff = new byte[bufferSize];
+ } else {
+ buff = new byte[(int) remaining];
+ }
+ read = dataInputStream.read(buff, 0, buff.length);
+ dataOutputStream.write(buff, 0, read);
+ remaining = remaining - read;
+ }
+ CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
+ // rename the temp file to original file
+ tempFile.renameForce(fileName);
+ fileTruncatedSuccessfully = true;
+ } catch (IOException e) {
+ LOGGER.error("Exception occured while truncating the file " + e.getMessage());
+ } finally {
+ CarbonUtil.closeStreams(dataOutputStream, dataInputStream);
+ }
+ return fileTruncatedSuccessfully;
+ }
+
+ /**
+ * This method will be used to check whether a file has been modified or not
+ *
+ * @param fileTimeStamp time to be compared with latest timestamp of file
+ * @param endOffset file length to be compared with current length of file
+ * @return
+ */
+ @Override public boolean isFileModified(long fileTimeStamp, long endOffset) {
+ boolean isFileModified = false;
+ if (getLastModifiedTime() > fileTimeStamp || getSize() > endOffset) {
+ isFileModified = true;
+ }
+ return isFileModified;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/25cd9e5d/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java b/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java
index a4127f4..98e40b4 100644
--- a/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java
+++ b/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java
@@ -19,164 +19,36 @@
package org.carbondata.core.datastorage.store.filesystem;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.carbondata.common.logging.LogService;
import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.constants.CarbonCommonConstants;
import org.carbondata.core.datastorage.store.impl.FileFactory;
-import org.carbondata.core.util.CarbonUtil;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
-public class HDFSCarbonFile implements CarbonFile {
+public class HDFSCarbonFile extends AbstractDFSCarbonFile {
/**
* LOGGER
*/
private static final LogService LOGGER =
LogServiceFactory.getLogService(HDFSCarbonFile.class.getName());
- private FileStatus fileStatus;
- private FileSystem fs;
public HDFSCarbonFile(String filePath) {
- filePath = filePath.replace("\\", "/");
- Path path = new Path(filePath);
- try {
- fs = path.getFileSystem(FileFactory.getConfiguration());
- fileStatus = fs.getFileStatus(path);
- } catch (IOException e) {
- LOGGER.error("Exception occured: " + e.getMessage());
- }
+ super(filePath);
}
public HDFSCarbonFile(Path path) {
- try {
- fs = path.getFileSystem(FileFactory.getConfiguration());
- fileStatus = fs.getFileStatus(path);
- } catch (IOException e) {
- LOGGER.error("Exception occured: " + e.getMessage());
- }
+ super(path);
}
public HDFSCarbonFile(FileStatus fileStatus) {
- this.fileStatus = fileStatus;
- }
-
- @Override public boolean createNewFile() {
- Path path = fileStatus.getPath();
- try {
- return fs.createNewFile(path);
- } catch (IOException e) {
- return false;
- }
-
- }
-
- @Override public String getAbsolutePath() {
- return fileStatus.getPath().toString();
- }
-
- @Override public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
- CarbonFile[] files = listFiles();
- if (files != null && files.length >= 1) {
-
- List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length);
- for (int i = 0; i < files.length; i++) {
- if (fileFilter.accept(files[i])) {
- fileList.add(files[i]);
- }
- }
-
- if (fileList.size() >= 1) {
- return fileList.toArray(new CarbonFile[fileList.size()]);
- } else {
- return new CarbonFile[0];
- }
- }
- return files;
- }
-
- @Override public String getName() {
- return fileStatus.getPath().getName();
- }
-
- @Override public boolean isDirectory() {
- return fileStatus.isDirectory();
- }
-
- @Override public boolean exists() {
- try {
- if (null != fileStatus) {
- fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
- return fs.exists(fileStatus.getPath());
- }
- } catch (IOException e) {
- LOGGER.error("Exception occured: " + e.getMessage());
- }
- return false;
- }
-
- @Override public String getCanonicalPath() {
- return getAbsolutePath();
- }
-
- @Override public CarbonFile getParentFile() {
- return new HDFSCarbonFile(fileStatus.getPath().getParent());
- }
-
- @Override public String getPath() {
- return getAbsolutePath();
- }
-
- @Override public long getSize() {
- return fileStatus.getLen();
- }
-
- public boolean renameTo(String changetoName) {
- FileSystem fs;
- try {
- fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
- return fs.rename(fileStatus.getPath(), new Path(changetoName));
- } catch (IOException e) {
- LOGGER.error("Exception occured: " + e.getMessage());
- return false;
- }
- }
-
- public boolean delete() {
- FileSystem fs;
- try {
- fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
- return fs.delete(fileStatus.getPath(), true);
- } catch (IOException e) {
- LOGGER.error("Exception occured" + e.getMessage());
- return false;
- }
- }
-
- @Override public CarbonFile[] listFiles() {
-
- FileStatus[] listStatus = null;
- try {
- if (null != fileStatus && fileStatus.isDirectory()) {
- Path path = fileStatus.getPath();
- listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path);
- } else {
- return null;
- }
- } catch (IOException ex) {
- LOGGER.error("Exception occured: " + ex.getMessage());
- return new CarbonFile[0];
- }
-
- return getFiles(listStatus);
+ super(fileStatus);
}
/**
@@ -187,98 +59,56 @@ public class HDFSCarbonFile implements CarbonFile {
if (listStatus == null) {
return new CarbonFile[0];
}
-
CarbonFile[] files = new CarbonFile[listStatus.length];
-
for (int i = 0; i < files.length; i++) {
files[i] = new HDFSCarbonFile(listStatus[i]);
}
return files;
}
- @Override public long getLastModifiedTime() {
- return fileStatus.getModificationTime();
- }
-
- @Override public boolean setLastModifiedTime(long timestamp) {
+ @Override
+ public CarbonFile[] listFiles() {
+ FileStatus[] listStatus = null;
try {
- fs.setTimes(fileStatus.getPath(), timestamp, timestamp);
+ if (null != fileStatus && fileStatus.isDirectory()) {
+ Path path = fileStatus.getPath();
+ listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path);
+ } else {
+ return null;
+ }
} catch (IOException e) {
- return false;
+ LOGGER.error("Exception occured: " + e.getMessage());
+ return new CarbonFile[0];
}
- return true;
+ return getFiles(listStatus);
}
- /**
- * This method will delete the data in file data from a given offset
- */
- @Override public boolean truncate(String fileName, long validDataEndOffset) {
- DataOutputStream dataOutputStream = null;
- DataInputStream dataInputStream = null;
- boolean fileTruncatedSuccessfully = false;
- // if bytes to read less than 1024 then buffer size should be equal to the given offset
- int bufferSize = validDataEndOffset > CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR ?
- CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR :
- (int) validDataEndOffset;
- // temporary file name
- String tempWriteFilePath = fileName + CarbonCommonConstants.TEMPWRITEFILEEXTENSION;
- FileFactory.FileType fileType = FileFactory.getFileType(fileName);
- try {
- CarbonFile tempFile = null;
- // delete temporary file if it already exists at a given path
- if (FileFactory.isFileExist(tempWriteFilePath, fileType)) {
- tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
- tempFile.delete();
- }
- // create new temporary file
- FileFactory.createNewFile(tempWriteFilePath, fileType);
- tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
- byte[] buff = new byte[bufferSize];
- dataInputStream = FileFactory.getDataInputStream(fileName, fileType);
- // read the data
- int read = dataInputStream.read(buff, 0, buff.length);
- dataOutputStream = FileFactory.getDataOutputStream(tempWriteFilePath, fileType);
- dataOutputStream.write(buff, 0, read);
- long remaining = validDataEndOffset - read;
- // anytime we should not cross the offset to be read
- while (remaining > 0) {
- if (remaining > bufferSize) {
- buff = new byte[bufferSize];
- } else {
- buff = new byte[(int) remaining];
+ @Override
+ public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
+ CarbonFile[] files = listFiles();
+ if (files != null && files.length >= 1) {
+ List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length);
+ for (int i = 0; i < files.length; i++) {
+ if (fileFilter.accept(files[i])) {
+ fileList.add(files[i]);
}
- read = dataInputStream.read(buff, 0, buff.length);
- dataOutputStream.write(buff, 0, read);
- remaining = remaining - read;
}
- CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
- // rename the temp file to original file
- tempFile.renameForce(fileName);
- fileTruncatedSuccessfully = true;
- } catch (IOException e) {
- LOGGER.error("Exception occured while truncating the file " + e.getMessage());
- } finally {
- CarbonUtil.closeStreams(dataOutputStream, dataInputStream);
+ if (fileList.size() >= 1) {
+ return fileList.toArray(new CarbonFile[fileList.size()]);
+ } else {
+ return new CarbonFile[0];
+ }
}
- return fileTruncatedSuccessfully;
+ return files;
}
- /**
- * This method will be used to check whether a file has been modified or not
- *
- * @param fileTimeStamp time to be compared with latest timestamp of file
- * @param endOffset file length to be compared with current length of file
- * @return
- */
- @Override public boolean isFileModified(long fileTimeStamp, long endOffset) {
- boolean isFileModified = false;
- if (getLastModifiedTime() > fileTimeStamp || getSize() > endOffset) {
- isFileModified = true;
- }
- return isFileModified;
+ @Override
+ public CarbonFile getParentFile() {
+ return new HDFSCarbonFile(fileStatus.getPath().getParent());
}
- @Override public boolean renameForce(String changetoName) {
+ @Override
+ public boolean renameForce(String changetoName) {
FileSystem fs;
try {
fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
@@ -294,5 +124,4 @@ public class HDFSCarbonFile implements CarbonFile {
return false;
}
}
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/25cd9e5d/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java b/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java
new file mode 100644
index 0000000..c7e4497
--- /dev/null
+++ b/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.core.datastorage.store.filesystem;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.datastorage.store.impl.FileFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.viewfs.ViewFileSystem;
+
+public class ViewFSCarbonFile extends AbstractDFSCarbonFile {
+ /**
+ * LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(ViewFSCarbonFile.class.getName());
+
+ public ViewFSCarbonFile(String filePath) {
+ super(filePath);
+ }
+
+ public ViewFSCarbonFile(Path path) {
+ super(path);
+ }
+
+ public ViewFSCarbonFile(FileStatus fileStatus) {
+ super(fileStatus);
+ }
+
+ /**
+ * @param listStatus
+ * @return
+ */
+ private CarbonFile[] getFiles(FileStatus[] listStatus) {
+ if (listStatus == null) {
+ return new CarbonFile[0];
+ }
+ CarbonFile[] files = new CarbonFile[listStatus.length];
+ for (int i = 0; i < files.length; i++) {
+ files[i] = new ViewFSCarbonFile(listStatus[i]);
+ }
+ return files;
+ }
+
+ @Override
+ public CarbonFile[] listFiles() {
+ FileStatus[] listStatus = null;
+ try {
+ if (null != fileStatus && fileStatus.isDirectory()) {
+ Path path = fileStatus.getPath();
+ listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path);
+ } else {
+ return null;
+ }
+ } catch (IOException ex) {
+ LOGGER.error("Exception occured" + ex.getMessage());
+ return new CarbonFile[0];
+ }
+ return getFiles(listStatus);
+ }
+
+ @Override
+ public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
+ CarbonFile[] files = listFiles();
+ if (files != null && files.length >= 1) {
+ List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length);
+ for (int i = 0; i < files.length; i++) {
+ if (fileFilter.accept(files[i])) {
+ fileList.add(files[i]);
+ }
+ }
+ if (fileList.size() >= 1) {
+ return fileList.toArray(new CarbonFile[fileList.size()]);
+ } else {
+ return new CarbonFile[0];
+ }
+ }
+ return files;
+ }
+
+ @Override
+ public CarbonFile getParentFile() {
+ return new ViewFSCarbonFile(fileStatus.getPath().getParent());
+ }
+
+ @Override
+ public boolean renameForce(String changetoName) {
+ FileSystem fs;
+ try {
+ fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
+ if (fs instanceof ViewFileSystem) {
+ fs.delete(new Path(changetoName), true);
+ fs.rename(fileStatus.getPath(), new Path(changetoName));
+ return true;
+ } else {
+ return false;
+ }
+ } catch (IOException e) {
+ LOGGER.error("Exception occured" + e.getMessage());
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/25cd9e5d/core/src/main/java/org/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java b/core/src/main/java/org/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java
new file mode 100644
index 0000000..653c243
--- /dev/null
+++ b/core/src/main/java/org/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.core.datastorage.store.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.datastorage.store.FileHolder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class DFSFileHolderImpl implements FileHolder {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(DFSFileHolderImpl.class.getName());
+ /**
+ * cache to hold filename and its stream
+ */
+ private Map<String, FSDataInputStream> fileNameAndStreamCache;
+
+ public DFSFileHolderImpl() {
+ this.fileNameAndStreamCache =
+ new HashMap<String, FSDataInputStream>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ }
+
+ @Override public byte[] readByteArray(String filePath, long offset, int length) {
+ FSDataInputStream fileChannel = updateCache(filePath);
+ byte[] byteBffer = read(fileChannel, length, offset);
+ return byteBffer;
+ }
+
+ /**
+ * This method will be used to check whether stream is already present in
+ * cache or not for filepath if not present then create it and then add to
+ * cache, other wise get from cache
+ *
+ * @param filePath fully qualified file path
+ * @return channel
+ */
+ private FSDataInputStream updateCache(String filePath) {
+ FSDataInputStream fileChannel = fileNameAndStreamCache.get(filePath);
+ try {
+ if (null == fileChannel) {
+ Path pt = new Path(filePath);
+ FileSystem fs = pt.getFileSystem(new Configuration());
+ fileChannel = fs.open(pt);
+ fileNameAndStreamCache.put(filePath, fileChannel);
+ }
+ } catch (IOException e) {
+ LOGGER.error(e, e.getMessage());
+ }
+ return fileChannel;
+ }
+
+ /**
+ * This method will be used to read from file based on number of bytes to be read and positon
+ *
+ * @param channel file channel
+ * @param size number of bytes
+ * @param offset position
+ * @return byte buffer
+ */
+ private byte[] read(FSDataInputStream channel, int size, long offset) {
+ byte[] byteBffer = new byte[size];
+ try {
+ channel.seek(offset);
+ channel.readFully(byteBffer);
+ } catch (Exception e) {
+ LOGGER.error(e, e.getMessage());
+ }
+ return byteBffer;
+ }
+
+ /**
+ * This method will be used to read from file based on number of bytes to be read and positon
+ *
+ * @param channel file channel
+ * @param size number of bytes
+ * @return byte buffer
+ */
+ private byte[] read(FSDataInputStream channel, int size) {
+ byte[] byteBffer = new byte[size];
+ try {
+ channel.readFully(byteBffer);
+ } catch (Exception e) {
+ LOGGER.error(e, e.getMessage());
+ }
+ return byteBffer;
+ }
+
+ @Override public int readInt(String filePath, long offset) {
+ FSDataInputStream fileChannel = updateCache(filePath);
+ int i = -1;
+ try {
+ fileChannel.seek(offset);
+ i = fileChannel.readInt();
+ } catch (IOException e) {
+ LOGGER.error(e, e.getMessage());
+ }
+
+ return i;
+ }
+
+ @Override public long readDouble(String filePath, long offset) {
+ FSDataInputStream fileChannel = updateCache(filePath);
+ long i = -1;
+ try {
+ fileChannel.seek(offset);
+ i = fileChannel.readLong();
+ } catch (IOException e) {
+ LOGGER.error(e, e.getMessage());
+ }
+
+ return i;
+ }
+
+ @Override public void finish() {
+ for (Entry<String, FSDataInputStream> entry : fileNameAndStreamCache.entrySet()) {
+ try {
+ FSDataInputStream channel = entry.getValue();
+ if (null != channel) {
+ channel.close();
+ }
+ } catch (IOException exception) {
+ LOGGER.error(exception, exception.getMessage());
+ }
+ }
+
+ }
+
+ @Override public byte[] readByteArray(String filePath, int length) {
+ FSDataInputStream fileChannel = updateCache(filePath);
+ byte[] byteBffer = read(fileChannel, length);
+ return byteBffer;
+ }
+
+ @Override public long readLong(String filePath, long offset) {
+ FSDataInputStream fileChannel = updateCache(filePath);
+ long i = -1;
+ try {
+ fileChannel.seek(offset);
+ i = fileChannel.readLong();
+ } catch (IOException e) {
+ LOGGER.error(e, e.getMessage());
+ }
+ return i;
+ }
+
+ @Override public int readInt(String filePath) {
+ FSDataInputStream fileChannel = updateCache(filePath);
+ int i = -1;
+ try {
+ i = fileChannel.readInt();
+ } catch (IOException e) {
+ LOGGER.error(e, e.getMessage());
+ }
+ return i;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/25cd9e5d/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileFactory.java b/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileFactory.java
index 11645fc..c88ade8 100644
--- a/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileFactory.java
+++ b/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileFactory.java
@@ -32,6 +32,7 @@ import org.carbondata.core.datastorage.store.FileHolder;
import org.carbondata.core.datastorage.store.filesystem.CarbonFile;
import org.carbondata.core.datastorage.store.filesystem.HDFSCarbonFile;
import org.carbondata.core.datastorage.store.filesystem.LocalCarbonFile;
+import org.carbondata.core.datastorage.store.filesystem.ViewFSCarbonFile;
import org.carbondata.core.util.CarbonUtil;
import org.apache.commons.io.FileUtils;
@@ -49,8 +50,10 @@ public final class FileFactory {
static {
String property = CarbonUtil.getCarbonStorePath(null, null);
if (property != null) {
- if (property.startsWith("hdfs://")) {
+ if (property.startsWith(CarbonUtil.HDFS_PREFIX)) {
storeDefaultFileType = FileType.HDFS;
+ } else if (property.startsWith(CarbonUtil.VIEWFS_PREFIX)) {
+ storeDefaultFileType = FileType.VIEWFS;
}
}
@@ -71,7 +74,8 @@ public final class FileFactory {
case LOCAL:
return new FileHolderImpl();
case HDFS:
- return new HDFSFileHolderImpl();
+ case VIEWFS:
+ return new DFSFileHolderImpl();
default:
return new FileHolderImpl();
}
@@ -80,16 +84,20 @@ public final class FileFactory {
public static FileType getFileType() {
String property = CarbonUtil.getCarbonStorePath(null, null);
if (property != null) {
- if (property.startsWith("hdfs://")) {
+ if (property.startsWith(CarbonUtil.HDFS_PREFIX)) {
storeDefaultFileType = FileType.HDFS;
+ } else if (property.startsWith(CarbonUtil.VIEWFS_PREFIX)) {
+ storeDefaultFileType = FileType.VIEWFS;
}
}
return storeDefaultFileType;
}
public static FileType getFileType(String path) {
- if (path.startsWith("hdfs://")) {
+ if (path.startsWith(CarbonUtil.HDFS_PREFIX)) {
return FileType.HDFS;
+ } else if (path.startsWith(CarbonUtil.VIEWFS_PREFIX)) {
+ return FileType.VIEWFS;
}
return FileType.LOCAL;
}
@@ -100,6 +108,8 @@ public final class FileFactory {
return new LocalCarbonFile(path);
case HDFS:
return new HDFSCarbonFile(path);
+ case VIEWFS:
+ return new ViewFSCarbonFile(path);
default:
return new LocalCarbonFile(path);
}
@@ -112,6 +122,7 @@ public final class FileFactory {
case LOCAL:
return new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
case HDFS:
+ case VIEWFS:
Path pt = new Path(path);
FileSystem fs = pt.getFileSystem(configuration);
FSDataInputStream stream = fs.open(pt);
@@ -128,6 +139,7 @@ public final class FileFactory {
case LOCAL:
return new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
case HDFS:
+ case VIEWFS:
Path pt = new Path(path);
FileSystem fs = pt.getFileSystem(configuration);
FSDataInputStream stream = fs.open(pt, bufferSize);
@@ -152,6 +164,7 @@ public final class FileFactory {
path = path.replace("\\", "/");
switch (fileType) {
case HDFS:
+ case VIEWFS:
Path pt = new Path(path);
FileSystem fs = pt.getFileSystem(configuration);
FSDataInputStream stream = fs.open(pt, bufferSize);
@@ -176,6 +189,7 @@ public final class FileFactory {
case LOCAL:
return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
case HDFS:
+ case VIEWFS:
Path pt = new Path(path);
FileSystem fs = pt.getFileSystem(configuration);
FSDataOutputStream stream = fs.create(pt, true);
@@ -192,6 +206,7 @@ public final class FileFactory {
case LOCAL:
return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
case HDFS:
+ case VIEWFS:
Path pt = new Path(path);
FileSystem fs = pt.getFileSystem(configuration);
FSDataOutputStream stream = fs.create(pt, replicationFactor);
@@ -209,6 +224,7 @@ public final class FileFactory {
return new DataOutputStream(
new BufferedOutputStream(new FileOutputStream(path), bufferSize));
case HDFS:
+ case VIEWFS:
Path pt = new Path(path);
FileSystem fs = pt.getFileSystem(configuration);
FSDataOutputStream stream = fs.create(pt, true, bufferSize);
@@ -227,6 +243,7 @@ public final class FileFactory {
return new DataOutputStream(
new BufferedOutputStream(new FileOutputStream(path, append), bufferSize));
case HDFS:
+ case VIEWFS:
Path pt = new Path(path);
FileSystem fs = pt.getFileSystem(configuration);
FSDataOutputStream stream = null;
@@ -256,6 +273,7 @@ public final class FileFactory {
return new DataOutputStream(
new BufferedOutputStream(new FileOutputStream(path), bufferSize));
case HDFS:
+ case VIEWFS:
Path pt = new Path(path);
FileSystem fs = pt.getFileSystem(configuration);
FSDataOutputStream stream =
@@ -280,6 +298,7 @@ public final class FileFactory {
filePath = filePath.replace("\\", "/");
switch (fileType) {
case HDFS:
+ case VIEWFS:
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(configuration);
if (performFileCheck) {
@@ -311,6 +330,7 @@ public final class FileFactory {
filePath = filePath.replace("\\", "/");
switch (fileType) {
case HDFS:
+ case VIEWFS:
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(configuration);
return fs.exists(path);
@@ -326,6 +346,7 @@ public final class FileFactory {
filePath = filePath.replace("\\", "/");
switch (fileType) {
case HDFS:
+ case VIEWFS:
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(configuration);
return fs.createNewFile(path);
@@ -341,6 +362,7 @@ public final class FileFactory {
filePath = filePath.replace("\\", "/");
switch (fileType) {
case HDFS:
+ case VIEWFS:
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(configuration);
return fs.mkdirs(path);
@@ -366,6 +388,7 @@ public final class FileFactory {
case LOCAL:
return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path, true)));
case HDFS:
+ case VIEWFS:
Path pt = new Path(path);
FileSystem fs = pt.getFileSystem(configuration);
FSDataOutputStream stream = fs.append(pt);
@@ -388,6 +411,7 @@ public final class FileFactory {
filePath = filePath.replace("\\", "/");
switch (fileType) {
case HDFS:
+ case VIEWFS:
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(configuration);
if (fs.createNewFile(path)) {
@@ -403,7 +427,7 @@ public final class FileFactory {
}
public enum FileType {
- LOCAL, HDFS
+ LOCAL, HDFS, VIEWFS
}
/**
@@ -418,6 +442,7 @@ public final class FileFactory {
FileType fileType = getFileType(filePath);
switch (fileType) {
case HDFS:
+ case VIEWFS:
return filePath;
case LOCAL:
default:
@@ -438,6 +463,7 @@ public final class FileFactory {
FileType fileType = getFileType(filePath);
switch (fileType) {
case HDFS:
+ case VIEWFS:
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(configuration);
return fs.getContentSummary(path).getLength();
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/25cd9e5d/core/src/main/java/org/carbondata/core/datastorage/store/impl/HDFSFileHolderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/impl/HDFSFileHolderImpl.java b/core/src/main/java/org/carbondata/core/datastorage/store/impl/HDFSFileHolderImpl.java
deleted file mode 100644
index 6e753e6..0000000
--- a/core/src/main/java/org/carbondata/core/datastorage/store/impl/HDFSFileHolderImpl.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.datastorage.store.impl;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.datastorage.store.FileHolder;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public class HDFSFileHolderImpl implements FileHolder {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(HDFSFileHolderImpl.class.getName());
- /**
- * cache to hold filename and its stream
- */
- private Map<String, FSDataInputStream> fileNameAndStreamCache;
-
- public HDFSFileHolderImpl() {
- this.fileNameAndStreamCache =
- new HashMap<String, FSDataInputStream>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- }
-
- @Override public byte[] readByteArray(String filePath, long offset, int length) {
-
- FSDataInputStream fileChannel = updateCache(filePath);
- byte[] byteBffer = read(fileChannel, length, offset);
- return byteBffer;
- }
-
- /**
- * This method will be used to check whether stream is already present in
- * cache or not for filepath if not present then create it and then add to
- * cache, other wise get from cache
- *
- * @param filePath fully qualified file path
- * @return channel
- */
- private FSDataInputStream updateCache(String filePath) {
- FSDataInputStream fileChannel = fileNameAndStreamCache.get(filePath);
- try {
- if (null == fileChannel) {
- Path pt = new Path(filePath);
- FileSystem fs = pt.getFileSystem(new Configuration());
- fileChannel = fs.open(pt);
- fileNameAndStreamCache.put(filePath, fileChannel);
- }
- } catch (IOException e) {
- LOGGER.error(e, e.getMessage());
- }
- return fileChannel;
- }
-
- /**
- * This method will be used to read from file based on number of bytes to be read and positon
- *
- * @param channel file channel
- * @param size number of bytes
- * @param offset position
- * @return byte buffer
- */
- private byte[] read(FSDataInputStream channel, int size, long offset) {
- byte[] byteBffer = new byte[size];
- try {
- channel.seek(offset);
- channel.readFully(byteBffer);
- } catch (Exception e) {
- LOGGER.error(e, e.getMessage());
- }
- return byteBffer;
- }
-
- /**
- * This method will be used to read from file based on number of bytes to be read and positon
- *
- * @param channel file channel
- * @param size number of bytes
- * @return byte buffer
- */
- private byte[] read(FSDataInputStream channel, int size) {
- byte[] byteBffer = new byte[size];
- try {
- channel.readFully(byteBffer);
- } catch (Exception e) {
- LOGGER.error(e, e.getMessage());
- }
- return byteBffer;
- }
-
- @Override public int readInt(String filePath, long offset) {
- FSDataInputStream fileChannel = updateCache(filePath);
- int i = -1;
- try {
- fileChannel.seek(offset);
- i = fileChannel.readInt();
- } catch (IOException e) {
- LOGGER.error(e, e.getMessage());
- }
-
- return i;
- }
-
- @Override public long readDouble(String filePath, long offset) {
- FSDataInputStream fileChannel = updateCache(filePath);
- long i = -1;
- try {
- fileChannel.seek(offset);
- i = fileChannel.readLong();
- } catch (IOException e) {
- LOGGER.error(e, e.getMessage());
- }
-
- return i;
- }
-
- @Override public void finish() {
- for (Entry<String, FSDataInputStream> entry : fileNameAndStreamCache.entrySet()) {
- try {
- FSDataInputStream channel = entry.getValue();
- if (null != channel) {
- channel.close();
- }
- } catch (IOException exception) {
- LOGGER.error(exception, exception.getMessage());
- }
- }
-
- }
-
- @Override public byte[] readByteArray(String filePath, int length) {
- FSDataInputStream fileChannel = updateCache(filePath);
- byte[] byteBffer = read(fileChannel, length);
- return byteBffer;
- }
-
- @Override public long readLong(String filePath, long offset) {
- FSDataInputStream fileChannel = updateCache(filePath);
- long i = -1;
- try {
- fileChannel.seek(offset);
- i = fileChannel.readLong();
- } catch (IOException e) {
- LOGGER.error(e, e.getMessage());
- }
- return i;
- }
-
- @Override public int readInt(String filePath) {
- FSDataInputStream fileChannel = updateCache(filePath);
- int i = -1;
- try {
- i = fileChannel.readInt();
- } catch (IOException e) {
- LOGGER.error(e, e.getMessage());
- }
- return i;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/25cd9e5d/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
index 2772362..2910105 100644
--- a/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
@@ -81,8 +81,8 @@ import org.pentaho.di.core.exception.KettleException;
public final class CarbonUtil {
- private static final String HDFS_PREFIX = "hdfs://";
-
+ public static final String HDFS_PREFIX = "hdfs://";
+ public static final String VIEWFS_PREFIX = "viewfs://";
private static final String FS_DEFAULT_FS = "fs.defaultFS";
/**
@@ -1213,21 +1213,22 @@ public final class CarbonUtil {
public static String checkAndAppendHDFSUrl(String filePath) {
String currentPath = filePath;
if (null != filePath && filePath.length() != 0 &&
- FileFactory.getFileType(filePath) != FileFactory.FileType.HDFS) {
- String baseHDFSUrl = CarbonProperties.getInstance()
+ FileFactory.getFileType(filePath) != FileFactory.FileType.HDFS &&
+ FileFactory.getFileType(filePath) != FileFactory.FileType.VIEWFS) {
+ String baseDFSUrl = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_DDL_BASE_HDFS_URL);
- if (null != baseHDFSUrl) {
- String hdfsUrl = conf.get(FS_DEFAULT_FS);
- if (hdfsUrl.startsWith(HDFS_PREFIX)) {
- baseHDFSUrl = hdfsUrl + baseHDFSUrl;
+ if (null != baseDFSUrl) {
+ String dfsUrl = conf.get(FS_DEFAULT_FS);
+ if (dfsUrl.startsWith(HDFS_PREFIX) || dfsUrl.startsWith(VIEWFS_PREFIX)) {
+ baseDFSUrl = dfsUrl + baseDFSUrl;
}
- if (baseHDFSUrl.endsWith("/")) {
- baseHDFSUrl = baseHDFSUrl.substring(0, baseHDFSUrl.length() - 1);
+ if (baseDFSUrl.endsWith("/")) {
+ baseDFSUrl = baseDFSUrl.substring(0, baseDFSUrl.length() - 1);
}
if (!filePath.startsWith("/")) {
filePath = "/" + filePath;
}
- currentPath = baseHDFSUrl + filePath;
+ currentPath = baseDFSUrl + filePath;
}
}
return currentPath;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/25cd9e5d/hadoop/src/main/java/org/carbondata/hadoop/util/SchemaReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/carbondata/hadoop/util/SchemaReader.java b/hadoop/src/main/java/org/carbondata/hadoop/util/SchemaReader.java
index 7f9e56d..31b0cfc 100644
--- a/hadoop/src/main/java/org/carbondata/hadoop/util/SchemaReader.java
+++ b/hadoop/src/main/java/org/carbondata/hadoop/util/SchemaReader.java
@@ -22,7 +22,8 @@ public class SchemaReader {
public CarbonTable readCarbonTableFromStore(CarbonTablePath carbonTablePath,
CarbonTableIdentifier tableIdentifier, String storePath) throws IOException {
String schemaFilePath = carbonTablePath.getSchemaFilePath();
- if (FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS)) {
+ if (FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS)
+ || FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)) {
String tableName = tableIdentifier.getTableName();
ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/25cd9e5d/processing/src/main/java/org/carbondata/processing/csvreaderstep/BlockDataHandler.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/csvreaderstep/BlockDataHandler.java b/processing/src/main/java/org/carbondata/processing/csvreaderstep/BlockDataHandler.java
index 575fd6a..d832504 100644
--- a/processing/src/main/java/org/carbondata/processing/csvreaderstep/BlockDataHandler.java
+++ b/processing/src/main/java/org/carbondata/processing/csvreaderstep/BlockDataHandler.java
@@ -177,7 +177,8 @@ public class BlockDataHandler {
}
// Open the next one...
- if (FileFactory.getFileType(blockDetails.getFilePath()) == FileFactory.FileType.HDFS) {
+ if (FileFactory.getFileType(blockDetails.getFilePath()) == FileFactory.FileType.HDFS
+ || FileFactory.getFileType(blockDetails.getFilePath()) == FileFactory.FileType.VIEWFS) {
//when case HDFS file type, we use the file path directly
//give 0 offset as the file start offset when open a new file
initializeFileReader(blockDetails.getFilePath(), 0);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/25cd9e5d/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
index 4c361fc..a64a256 100644
--- a/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -31,10 +31,7 @@ import org.carbondata.core.carbon.metadata.schema.table.CarbonTable;
import org.carbondata.core.carbon.path.CarbonStorePath;
import org.carbondata.core.carbon.path.CarbonTablePath;
import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import org.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
-import org.carbondata.core.datastorage.store.filesystem.HDFSCarbonFile;
-import org.carbondata.core.datastorage.store.filesystem.LocalCarbonFile;
+import org.carbondata.core.datastorage.store.filesystem.*;
import org.carbondata.core.datastorage.store.impl.FileFactory;
import org.carbondata.core.datastorage.store.impl.FileFactory.FileType;
import org.carbondata.core.load.LoadMetadataDetails;
@@ -125,12 +122,7 @@ public final class CarbonDataProcessorUtil {
} catch (IOException e1) {
LOGGER.info("bad record folder does not exist");
}
- CarbonFile carbonFile = null;
- if (fileType.equals(FileFactory.FileType.HDFS)) {
- carbonFile = new HDFSCarbonFile(badLogStoreLocation);
- } else {
- carbonFile = new LocalCarbonFile(badLogStoreLocation);
- }
+ CarbonFile carbonFile = FileFactory.getCarbonFile(badLogStoreLocation, fileType);
CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
@Override public boolean accept(CarbonFile pathname) {