You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/08/15 07:09:25 UTC
[40/52] [partial] incubator-carbondata git commit: Renamed packages
to org.apache.carbondata and fixed errors
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/AbstractDFSCarbonFile.java
new file mode 100644
index 0000000..26af405
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/AbstractDFSCarbonFile.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.store.filesystem;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public abstract class AbstractDFSCarbonFile implements CarbonFile {
+ /**
+ * LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(AbstractDFSCarbonFile.class.getName());
+ protected FileStatus fileStatus;
+ protected FileSystem fs;
+
+ public AbstractDFSCarbonFile(String filePath) {
+ filePath = filePath.replace("\\", "/");
+ Path path = new Path(filePath);
+ try {
+ fs = path.getFileSystem(FileFactory.getConfiguration());
+ fileStatus = fs.getFileStatus(path);
+ } catch (IOException e) {
+ LOGGER.error("Exception occured:" + e.getMessage());
+ }
+ }
+
+ public AbstractDFSCarbonFile(Path path) {
+ try {
+ fs = path.getFileSystem(FileFactory.getConfiguration());
+ fileStatus = fs.getFileStatus(path);
+ } catch (IOException e) {
+ LOGGER.error("Exception occured:" + e.getMessage());
+ }
+ }
+
+ public AbstractDFSCarbonFile(FileStatus fileStatus) {
+ this.fileStatus = fileStatus;
+ }
+
+ @Override public boolean createNewFile() {
+ Path path = fileStatus.getPath();
+ try {
+ return fs.createNewFile(path);
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ @Override public String getAbsolutePath() {
+ return fileStatus.getPath().toString();
+ }
+
+ @Override public String getName() {
+ return fileStatus.getPath().getName();
+ }
+
+ @Override public boolean isDirectory() {
+ return fileStatus.isDirectory();
+ }
+
+ @Override public boolean exists() {
+ try {
+ if (null != fileStatus) {
+ fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
+ return fs.exists(fileStatus.getPath());
+ }
+ } catch (IOException e) {
+ LOGGER.error("Exception occured:" + e.getMessage());
+ }
+ return false;
+ }
+
+ @Override public String getCanonicalPath() {
+ return getAbsolutePath();
+ }
+
+ @Override public String getPath() {
+ return getAbsolutePath();
+ }
+
+ @Override public long getSize() {
+ return fileStatus.getLen();
+ }
+
+ public boolean renameTo(String changetoName) {
+ FileSystem fs;
+ try {
+ fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
+ return fs.rename(fileStatus.getPath(), new Path(changetoName));
+ } catch (IOException e) {
+ LOGGER.error("Exception occured:" + e.getMessage());
+ return false;
+ }
+ }
+
+ public boolean delete() {
+ FileSystem fs;
+ try {
+ fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
+ return fs.delete(fileStatus.getPath(), true);
+ } catch (IOException e) {
+ LOGGER.error("Exception occured:" + e.getMessage());
+ return false;
+ }
+ }
+
+ @Override public long getLastModifiedTime() {
+ return fileStatus.getModificationTime();
+ }
+
+ @Override public boolean setLastModifiedTime(long timestamp) {
+ try {
+ fs.setTimes(fileStatus.getPath(), timestamp, timestamp);
+ } catch (IOException e) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * This method will delete the data in file data from a given offset
+ */
+ @Override public boolean truncate(String fileName, long validDataEndOffset) {
+ DataOutputStream dataOutputStream = null;
+ DataInputStream dataInputStream = null;
+ boolean fileTruncatedSuccessfully = false;
+ // if bytes to read less than 1024 then buffer size should be equal to the given offset
+ int bufferSize = validDataEndOffset > CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR ?
+ CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR :
+ (int) validDataEndOffset;
+ // temporary file name
+ String tempWriteFilePath = fileName + CarbonCommonConstants.TEMPWRITEFILEEXTENSION;
+ FileFactory.FileType fileType = FileFactory.getFileType(fileName);
+ try {
+ CarbonFile tempFile = null;
+ // delete temporary file if it already exists at a given path
+ if (FileFactory.isFileExist(tempWriteFilePath, fileType)) {
+ tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
+ tempFile.delete();
+ }
+ // create new temporary file
+ FileFactory.createNewFile(tempWriteFilePath, fileType);
+ tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
+ byte[] buff = new byte[bufferSize];
+ dataInputStream = FileFactory.getDataInputStream(fileName, fileType);
+ // read the data
+ int read = dataInputStream.read(buff, 0, buff.length);
+ dataOutputStream = FileFactory.getDataOutputStream(tempWriteFilePath, fileType);
+ dataOutputStream.write(buff, 0, read);
+ long remaining = validDataEndOffset - read;
+ // anytime we should not cross the offset to be read
+ while (remaining > 0) {
+ if (remaining > bufferSize) {
+ buff = new byte[bufferSize];
+ } else {
+ buff = new byte[(int) remaining];
+ }
+ read = dataInputStream.read(buff, 0, buff.length);
+ dataOutputStream.write(buff, 0, read);
+ remaining = remaining - read;
+ }
+ CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
+ // rename the temp file to original file
+ tempFile.renameForce(fileName);
+ fileTruncatedSuccessfully = true;
+ } catch (IOException e) {
+ LOGGER.error("Exception occured while truncating the file " + e.getMessage());
+ } finally {
+ CarbonUtil.closeStreams(dataOutputStream, dataInputStream);
+ }
+ return fileTruncatedSuccessfully;
+ }
+
+ /**
+ * This method will be used to check whether a file has been modified or not
+ *
+ * @param fileTimeStamp time to be compared with latest timestamp of file
+ * @param endOffset file length to be compared with current length of file
+ * @return
+ */
+ @Override public boolean isFileModified(long fileTimeStamp, long endOffset) {
+ boolean isFileModified = false;
+ if (getLastModifiedTime() > fileTimeStamp || getSize() > endOffset) {
+ isFileModified = true;
+ }
+ return isFileModified;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFile.java
new file mode 100644
index 0000000..642055b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFile.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.store.filesystem;
+
+public interface CarbonFile {
+
+ String getAbsolutePath();
+
+ CarbonFile[] listFiles(CarbonFileFilter fileFilter);
+
+ CarbonFile[] listFiles();
+
+ String getName();
+
+ boolean isDirectory();
+
+ boolean exists();
+
+ String getCanonicalPath();
+
+ CarbonFile getParentFile();
+
+ String getPath();
+
+ long getSize();
+
+ boolean renameTo(String changetoName);
+
+ boolean renameForce(String changetoName);
+
+ boolean delete();
+
+ boolean createNewFile();
+
+ long getLastModifiedTime();
+
+ boolean setLastModifiedTime(long timestamp);
+
+ boolean truncate(String fileName, long validDataEndOffset);
+
+ /**
+ * This method will be used to check whether a file has been modified or not
+ *
+ * @param fileTimeStamp time to be compared with latest timestamp of file
+ * @param endOffset file length to be compared with current length of file
+ * @return
+ */
+ boolean isFileModified(long fileTimeStamp, long endOffset);
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFileFilter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFileFilter.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFileFilter.java
new file mode 100644
index 0000000..7db3b2b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFileFilter.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.store.filesystem;
+
+public interface CarbonFileFilter {
+ boolean accept(CarbonFile file);
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java
new file mode 100644
index 0000000..ebe18e4
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.store.filesystem;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+
+public class HDFSCarbonFile extends AbstractDFSCarbonFile {
+ /**
+ * LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(HDFSCarbonFile.class.getName());
+
+ public HDFSCarbonFile(String filePath) {
+ super(filePath);
+ }
+
+ public HDFSCarbonFile(Path path) {
+ super(path);
+ }
+
+ public HDFSCarbonFile(FileStatus fileStatus) {
+ super(fileStatus);
+ }
+
+ /**
+ * @param listStatus
+ * @return
+ */
+ private CarbonFile[] getFiles(FileStatus[] listStatus) {
+ if (listStatus == null) {
+ return new CarbonFile[0];
+ }
+ CarbonFile[] files = new CarbonFile[listStatus.length];
+ for (int i = 0; i < files.length; i++) {
+ files[i] = new HDFSCarbonFile(listStatus[i]);
+ }
+ return files;
+ }
+
+ @Override
+ public CarbonFile[] listFiles() {
+ FileStatus[] listStatus = null;
+ try {
+ if (null != fileStatus && fileStatus.isDirectory()) {
+ Path path = fileStatus.getPath();
+ listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path);
+ } else {
+ return null;
+ }
+ } catch (IOException e) {
+ LOGGER.error("Exception occured: " + e.getMessage());
+ return new CarbonFile[0];
+ }
+ return getFiles(listStatus);
+ }
+
+ @Override
+ public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
+ CarbonFile[] files = listFiles();
+ if (files != null && files.length >= 1) {
+ List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length);
+ for (int i = 0; i < files.length; i++) {
+ if (fileFilter.accept(files[i])) {
+ fileList.add(files[i]);
+ }
+ }
+ if (fileList.size() >= 1) {
+ return fileList.toArray(new CarbonFile[fileList.size()]);
+ } else {
+ return new CarbonFile[0];
+ }
+ }
+ return files;
+ }
+
+ @Override
+ public CarbonFile getParentFile() {
+ Path parent = fileStatus.getPath().getParent();
+ return null == parent ? null : new HDFSCarbonFile(parent);
+ }
+
+ @Override
+ public boolean renameForce(String changetoName) {
+ FileSystem fs;
+ try {
+ fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
+ if (fs instanceof DistributedFileSystem) {
+ ((DistributedFileSystem) fs).rename(fileStatus.getPath(), new Path(changetoName),
+ org.apache.hadoop.fs.Options.Rename.OVERWRITE);
+ return true;
+ } else {
+ return false;
+ }
+ } catch (IOException e) {
+ LOGGER.error("Exception occured: " + e.getMessage());
+ return false;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/LocalCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/LocalCarbonFile.java
new file mode 100644
index 0000000..f46aeed
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/LocalCarbonFile.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.store.filesystem;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.hadoop.fs.Path;
+
+public class LocalCarbonFile implements CarbonFile {
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(LocalCarbonFile.class.getName());
+ private File file;
+
+ public LocalCarbonFile(String filePath) {
+ Path pathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(new Path(filePath));
+ file = new File(pathWithoutSchemeAndAuthority.toString());
+ }
+
+ public LocalCarbonFile(File file) {
+ this.file = file;
+ }
+
+ @Override public String getAbsolutePath() {
+ return file.getAbsolutePath();
+ }
+
+ @Override public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
+ if (!file.isDirectory()) {
+ return null;
+ }
+
+ File[] files = file.listFiles(new FileFilter() {
+
+ @Override public boolean accept(File pathname) {
+ return fileFilter.accept(new LocalCarbonFile(pathname));
+ }
+ });
+
+ if (files == null) {
+ return new CarbonFile[0];
+ }
+
+ CarbonFile[] carbonFiles = new CarbonFile[files.length];
+
+ for (int i = 0; i < carbonFiles.length; i++) {
+ carbonFiles[i] = new LocalCarbonFile(files[i]);
+ }
+
+ return carbonFiles;
+ }
+
+ @Override public String getName() {
+ return file.getName();
+ }
+
+ @Override public boolean isDirectory() {
+ return file.isDirectory();
+ }
+
+ @Override public boolean exists() {
+ return file.exists();
+ }
+
+ @Override public String getCanonicalPath() {
+ try {
+ return file.getCanonicalPath();
+ } catch (IOException e) {
+ LOGGER
+ .error(e, "Exception occured" + e.getMessage());
+ }
+ return null;
+ }
+
+ @Override public CarbonFile getParentFile() {
+ return new LocalCarbonFile(file.getParentFile());
+ }
+
+ @Override public String getPath() {
+ return file.getPath();
+ }
+
+ @Override public long getSize() {
+ return file.length();
+ }
+
+ public boolean renameTo(String changetoName) {
+ return file.renameTo(new File(changetoName));
+ }
+
+ public boolean delete() {
+ return file.delete();
+ }
+
+ @Override public CarbonFile[] listFiles() {
+
+ if (!file.isDirectory()) {
+ return null;
+ }
+ File[] files = file.listFiles();
+ if (files == null) {
+ return new CarbonFile[0];
+ }
+ CarbonFile[] carbonFiles = new CarbonFile[files.length];
+ for (int i = 0; i < carbonFiles.length; i++) {
+ carbonFiles[i] = new LocalCarbonFile(files[i]);
+ }
+
+ return carbonFiles;
+
+ }
+
+ @Override public boolean createNewFile() {
+ try {
+ return file.createNewFile();
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ @Override public long getLastModifiedTime() {
+ return file.lastModified();
+ }
+
+ @Override public boolean setLastModifiedTime(long timestamp) {
+ return file.setLastModified(timestamp);
+ }
+
+ /**
+ * This method will delete the data in file data from a given offset
+ */
+ @Override public boolean truncate(String fileName, long validDataEndOffset) {
+ FileChannel source = null;
+ FileChannel destination = null;
+ boolean fileTruncatedSuccessfully = false;
+ // temporary file name
+ String tempWriteFilePath = fileName + CarbonCommonConstants.TEMPWRITEFILEEXTENSION;
+ FileFactory.FileType fileType = FileFactory.getFileType(fileName);
+ try {
+ CarbonFile tempFile = null;
+ // delete temporary file if it already exists at a given path
+ if (FileFactory.isFileExist(tempWriteFilePath, fileType)) {
+ tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
+ tempFile.delete();
+ }
+ // create new temporary file
+ FileFactory.createNewFile(tempWriteFilePath, fileType);
+ tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
+ source = new FileInputStream(fileName).getChannel();
+ destination = new FileOutputStream(tempWriteFilePath).getChannel();
+ long read = destination.transferFrom(source, 0, validDataEndOffset);
+ long totalBytesRead = read;
+ long remaining = validDataEndOffset - totalBytesRead;
+ // read till required data offset is not reached
+ while (remaining > 0) {
+ read = destination.transferFrom(source, totalBytesRead, remaining);
+ totalBytesRead = totalBytesRead + read;
+ remaining = remaining - totalBytesRead;
+ }
+ CarbonUtil.closeStreams(source, destination);
+ // rename the temp file to original file
+ tempFile.renameForce(fileName);
+ fileTruncatedSuccessfully = true;
+ } catch (IOException e) {
+ LOGGER.error("Exception occured while truncating the file " + e.getMessage());
+ } finally {
+ CarbonUtil.closeStreams(source, destination);
+ }
+ return fileTruncatedSuccessfully;
+ }
+
+ /**
+ * This method will be used to check whether a file has been modified or not
+ *
+ * @param fileTimeStamp time to be compared with latest timestamp of file
+ * @param endOffset file length to be compared with current length of file
+ * @return
+ */
+ @Override public boolean isFileModified(long fileTimeStamp, long endOffset) {
+ boolean isFileModified = false;
+ if (getLastModifiedTime() > fileTimeStamp || getSize() > endOffset) {
+ isFileModified = true;
+ }
+ return isFileModified;
+ }
+
+ @Override public boolean renameForce(String changetoName) {
+ File destFile = new File(changetoName);
+ if (destFile.exists()) {
+ if (destFile.delete()) {
+ return file.renameTo(new File(changetoName));
+ }
+ }
+
+ return file.renameTo(new File(changetoName));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java
new file mode 100644
index 0000000..8f11b7a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastorage.store.filesystem;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.viewfs.ViewFileSystem;
+
+public class ViewFSCarbonFile extends AbstractDFSCarbonFile {
+ /**
+ * LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(ViewFSCarbonFile.class.getName());
+
+ public ViewFSCarbonFile(String filePath) {
+ super(filePath);
+ }
+
+ public ViewFSCarbonFile(Path path) {
+ super(path);
+ }
+
+ public ViewFSCarbonFile(FileStatus fileStatus) {
+ super(fileStatus);
+ }
+
+ /**
+ * @param listStatus
+ * @return
+ */
+ private CarbonFile[] getFiles(FileStatus[] listStatus) {
+ if (listStatus == null) {
+ return new CarbonFile[0];
+ }
+ CarbonFile[] files = new CarbonFile[listStatus.length];
+ for (int i = 0; i < files.length; i++) {
+ files[i] = new ViewFSCarbonFile(listStatus[i]);
+ }
+ return files;
+ }
+
+ @Override
+ public CarbonFile[] listFiles() {
+ FileStatus[] listStatus = null;
+ try {
+ if (null != fileStatus && fileStatus.isDirectory()) {
+ Path path = fileStatus.getPath();
+ listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path);
+ } else {
+ return null;
+ }
+ } catch (IOException ex) {
+ LOGGER.error("Exception occured" + ex.getMessage());
+ return new CarbonFile[0];
+ }
+ return getFiles(listStatus);
+ }
+
+ @Override
+ public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
+ CarbonFile[] files = listFiles();
+ if (files != null && files.length >= 1) {
+ List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length);
+ for (int i = 0; i < files.length; i++) {
+ if (fileFilter.accept(files[i])) {
+ fileList.add(files[i]);
+ }
+ }
+ if (fileList.size() >= 1) {
+ return fileList.toArray(new CarbonFile[fileList.size()]);
+ } else {
+ return new CarbonFile[0];
+ }
+ }
+ return files;
+ }
+
+ @Override public CarbonFile getParentFile() {
+ Path parent = fileStatus.getPath().getParent();
+ return null == parent ? null : new ViewFSCarbonFile(parent);
+ }
+
+ @Override
+ public boolean renameForce(String changetoName) {
+ FileSystem fs;
+ try {
+ fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
+ if (fs instanceof ViewFileSystem) {
+ fs.delete(new Path(changetoName), true);
+ fs.rename(fileStatus.getPath(), new Path(changetoName));
+ return true;
+ } else {
+ return false;
+ }
+ } catch (IOException e) {
+ LOGGER.error("Exception occured" + e.getMessage());
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/CompressedDataMeasureDataWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/CompressedDataMeasureDataWrapper.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/CompressedDataMeasureDataWrapper.java
new file mode 100644
index 0000000..c9571b2
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/CompressedDataMeasureDataWrapper.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.store.impl;
+
+import org.apache.carbondata.core.datastorage.store.MeasureDataWrapper;
+import org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
+
+public class CompressedDataMeasureDataWrapper implements MeasureDataWrapper {
+
+ private final CarbonReadDataHolder[] values;
+
+ public CompressedDataMeasureDataWrapper(final CarbonReadDataHolder[] values) {
+ this.values = values;
+ }
+
+ @Override public CarbonReadDataHolder[] getValues() {
+ return values;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java
new file mode 100644
index 0000000..65c6556
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastorage.store.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.FileHolder;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+public class DFSFileHolderImpl implements FileHolder {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(DFSFileHolderImpl.class.getName());
+ /**
+ * cache to hold filename and its stream
+ */
+ private Map<String, FSDataInputStream> fileNameAndStreamCache;
+
+ public DFSFileHolderImpl() {
+ this.fileNameAndStreamCache =
+ new HashMap<String, FSDataInputStream>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ }
+
+ @Override public byte[] readByteArray(String filePath, long offset, int length) {
+ FSDataInputStream fileChannel = updateCache(filePath);
+ byte[] byteBffer = read(fileChannel, length, offset);
+ return byteBffer;
+ }
+
+ /**
+ * This method will be used to check whether stream is already present in
+ * cache or not for filepath if not present then create it and then add to
+ * cache, other wise get from cache
+ *
+ * @param filePath fully qualified file path
+ * @return channel
+ */
+ private FSDataInputStream updateCache(String filePath) {
+ FSDataInputStream fileChannel = fileNameAndStreamCache.get(filePath);
+ try {
+ if (null == fileChannel) {
+ Path pt = new Path(filePath);
+ FileSystem fs = FileSystem.get(FileFactory.getConfiguration());
+ fileChannel = fs.open(pt);
+ fileNameAndStreamCache.put(filePath, fileChannel);
+ }
+ } catch (IOException e) {
+ LOGGER.error(e, e.getMessage());
+ }
+ return fileChannel;
+ }
+
+ /**
+ * This method will be used to read from file based on number of bytes to be read and positon
+ *
+ * @param channel file channel
+ * @param size number of bytes
+ * @param offset position
+ * @return byte buffer
+ */
+ private byte[] read(FSDataInputStream channel, int size, long offset) {
+ byte[] byteBffer = new byte[size];
+ try {
+ channel.seek(offset);
+ channel.readFully(byteBffer);
+ } catch (Exception e) {
+ LOGGER.error(e, e.getMessage());
+ }
+ return byteBffer;
+ }
+
+ /**
+ * This method will be used to read from file based on number of bytes to be read and positon
+ *
+ * @param channel file channel
+ * @param size number of bytes
+ * @return byte buffer
+ */
+ private byte[] read(FSDataInputStream channel, int size) {
+ byte[] byteBffer = new byte[size];
+ try {
+ channel.readFully(byteBffer);
+ } catch (Exception e) {
+ LOGGER.error(e, e.getMessage());
+ }
+ return byteBffer;
+ }
+
+ @Override public int readInt(String filePath, long offset) {
+ FSDataInputStream fileChannel = updateCache(filePath);
+ int i = -1;
+ try {
+ fileChannel.seek(offset);
+ i = fileChannel.readInt();
+ } catch (IOException e) {
+ LOGGER.error(e, e.getMessage());
+ }
+
+ return i;
+ }
+
+ @Override public long readDouble(String filePath, long offset) {
+ FSDataInputStream fileChannel = updateCache(filePath);
+ long i = -1;
+ try {
+ fileChannel.seek(offset);
+ i = fileChannel.readLong();
+ } catch (IOException e) {
+ LOGGER.error(e, e.getMessage());
+ }
+
+ return i;
+ }
+
+ @Override public void finish() {
+ for (Entry<String, FSDataInputStream> entry : fileNameAndStreamCache.entrySet()) {
+ try {
+ FSDataInputStream channel = entry.getValue();
+ if (null != channel) {
+ channel.close();
+ }
+ } catch (IOException exception) {
+ LOGGER.error(exception, exception.getMessage());
+ }
+ }
+
+ }
+
+ @Override public byte[] readByteArray(String filePath, int length) {
+ FSDataInputStream fileChannel = updateCache(filePath);
+ byte[] byteBffer = read(fileChannel, length);
+ return byteBffer;
+ }
+
+ @Override public long readLong(String filePath, long offset) {
+ FSDataInputStream fileChannel = updateCache(filePath);
+ long i = -1;
+ try {
+ fileChannel.seek(offset);
+ i = fileChannel.readLong();
+ } catch (IOException e) {
+ LOGGER.error(e, e.getMessage());
+ }
+ return i;
+ }
+
+ @Override public int readInt(String filePath) {
+ FSDataInputStream fileChannel = updateCache(filePath);
+ int i = -1;
+ try {
+ i = fileChannel.readInt();
+ } catch (IOException e) {
+ LOGGER.error(e, e.getMessage());
+ }
+ return i;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
new file mode 100644
index 0000000..d537d6e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.store.impl;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastorage.store.FileHolder;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastorage.store.filesystem.HDFSCarbonFile;
+import org.apache.carbondata.core.datastorage.store.filesystem.LocalCarbonFile;
+import org.apache.carbondata.core.datastorage.store.filesystem.ViewFSCarbonFile;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public final class FileFactory {
+ private static Configuration configuration = null;
+
+ private static FileType storeDefaultFileType = FileType.LOCAL;
+
+ static {
+ String property = CarbonUtil.getCarbonStorePath(null, null);
+ if (property != null) {
+ if (property.startsWith(CarbonUtil.HDFS_PREFIX)) {
+ storeDefaultFileType = FileType.HDFS;
+ } else if (property.startsWith(CarbonUtil.VIEWFS_PREFIX)) {
+ storeDefaultFileType = FileType.VIEWFS;
+ }
+ }
+
+ configuration = new Configuration();
+ configuration.addResource(new Path("../core-default.xml"));
+ }
+
+ private FileFactory() {
+
+ }
+
+ public static Configuration getConfiguration() {
+ return configuration;
+ }
+
+ public static FileHolder getFileHolder(FileType fileType) {
+ switch (fileType) {
+ case LOCAL:
+ return new FileHolderImpl();
+ case HDFS:
+ case VIEWFS:
+ return new DFSFileHolderImpl();
+ default:
+ return new FileHolderImpl();
+ }
+ }
+
+ public static FileType getFileType() {
+ String property = CarbonUtil.getCarbonStorePath(null, null);
+ if (property != null) {
+ if (property.startsWith(CarbonUtil.HDFS_PREFIX)) {
+ storeDefaultFileType = FileType.HDFS;
+ } else if (property.startsWith(CarbonUtil.VIEWFS_PREFIX)) {
+ storeDefaultFileType = FileType.VIEWFS;
+ }
+ }
+ return storeDefaultFileType;
+ }
+
+ public static FileType getFileType(String path) {
+ if (path.startsWith(CarbonUtil.HDFS_PREFIX)) {
+ return FileType.HDFS;
+ } else if (path.startsWith(CarbonUtil.VIEWFS_PREFIX)) {
+ return FileType.VIEWFS;
+ }
+ return FileType.LOCAL;
+ }
+
+ public static CarbonFile getCarbonFile(String path, FileType fileType) {
+ switch (fileType) {
+ case LOCAL:
+ return new LocalCarbonFile(path);
+ case HDFS:
+ return new HDFSCarbonFile(path);
+ case VIEWFS:
+ return new ViewFSCarbonFile(path);
+ default:
+ return new LocalCarbonFile(path);
+ }
+ }
+
+ public static DataInputStream getDataInputStream(String path, FileType fileType)
+ throws IOException {
+ path = path.replace("\\", "/");
+ switch (fileType) {
+ case LOCAL:
+ return new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
+ case HDFS:
+ case VIEWFS:
+ Path pt = new Path(path);
+ FileSystem fs = FileSystem.get(configuration);
+ FSDataInputStream stream = fs.open(pt);
+ return new DataInputStream(new BufferedInputStream(stream));
+ default:
+ return new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
+ }
+ }
+
+ public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize)
+ throws IOException {
+ path = path.replace("\\", "/");
+ switch (fileType) {
+ case LOCAL:
+ return new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
+ case HDFS:
+ case VIEWFS:
+ Path pt = new Path(path);
+ FileSystem fs = FileSystem.get(configuration);
+ FSDataInputStream stream = fs.open(pt, bufferSize);
+ return new DataInputStream(new BufferedInputStream(stream));
+ default:
+ return new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
+ }
+ }
+
+ /**
+ * return the datainputStream which is seek to the offset of file
+ *
+ * @param path
+ * @param fileType
+ * @param bufferSize
+ * @param offset
+ * @return DataInputStream
+ * @throws IOException
+ */
+ public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize,
+ long offset) throws IOException {
+ path = path.replace("\\", "/");
+ switch (fileType) {
+ case HDFS:
+ case VIEWFS:
+ Path pt = new Path(path);
+ FileSystem fs = FileSystem.get(configuration);
+ FSDataInputStream stream = fs.open(pt, bufferSize);
+ stream.seek(offset);
+ return new DataInputStream(new BufferedInputStream(stream));
+ default:
+ FileInputStream fis = new FileInputStream(path);
+ long actualSkipSize = 0;
+ long skipSize = offset;
+ while (actualSkipSize != offset) {
+ actualSkipSize += fis.skip(skipSize);
+ skipSize = skipSize - actualSkipSize;
+ }
+ return new DataInputStream(new BufferedInputStream(fis));
+ }
+ }
+
+ public static DataOutputStream getDataOutputStream(String path, FileType fileType)
+ throws IOException {
+ path = path.replace("\\", "/");
+ switch (fileType) {
+ case LOCAL:
+ return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
+ case HDFS:
+ case VIEWFS:
+ Path pt = new Path(path);
+ FileSystem fs = pt.getFileSystem(configuration);
+ FSDataOutputStream stream = fs.create(pt, true);
+ return stream;
+ default:
+ return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
+ }
+ }
+
+ public static DataOutputStream getDataOutputStream(String path, FileType fileType,
+ short replicationFactor) throws IOException {
+ path = path.replace("\\", "/");
+ switch (fileType) {
+ case LOCAL:
+ return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
+ case HDFS:
+ case VIEWFS:
+ Path pt = new Path(path);
+ FileSystem fs = pt.getFileSystem(configuration);
+ FSDataOutputStream stream = fs.create(pt, replicationFactor);
+ return stream;
+ default:
+ return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
+ }
+ }
+
+ public static DataOutputStream getDataOutputStream(String path, FileType fileType, int bufferSize)
+ throws IOException {
+ path = path.replace("\\", "/");
+ switch (fileType) {
+ case LOCAL:
+ return new DataOutputStream(
+ new BufferedOutputStream(new FileOutputStream(path), bufferSize));
+ case HDFS:
+ case VIEWFS:
+ Path pt = new Path(path);
+ FileSystem fs = pt.getFileSystem(configuration);
+ FSDataOutputStream stream = fs.create(pt, true, bufferSize);
+ return stream;
+ default:
+ return new DataOutputStream(
+ new BufferedOutputStream(new FileOutputStream(path), bufferSize));
+ }
+ }
+
+ public static DataOutputStream getDataOutputStream(String path, FileType fileType, int bufferSize,
+ boolean append) throws IOException {
+ path = path.replace("\\", "/");
+ switch (fileType) {
+ case LOCAL:
+ return new DataOutputStream(
+ new BufferedOutputStream(new FileOutputStream(path, append), bufferSize));
+ case HDFS:
+ case VIEWFS:
+ Path pt = new Path(path);
+ FileSystem fs = pt.getFileSystem(configuration);
+ FSDataOutputStream stream = null;
+ if (append) {
+ // append to a file only if file already exists else file not found
+ // exception will be thrown by hdfs
+ if (CarbonUtil.isFileExists(path)) {
+ stream = fs.append(pt, bufferSize);
+ } else {
+ stream = fs.create(pt, true, bufferSize);
+ }
+ } else {
+ stream = fs.create(pt, true, bufferSize);
+ }
+ return stream;
+ default:
+ return new DataOutputStream(
+ new BufferedOutputStream(new FileOutputStream(path), bufferSize));
+ }
+ }
+
+ public static DataOutputStream getDataOutputStream(String path, FileType fileType, int bufferSize,
+ long blockSize) throws IOException {
+ path = path.replace("\\", "/");
+ switch (fileType) {
+ case LOCAL:
+ return new DataOutputStream(
+ new BufferedOutputStream(new FileOutputStream(path), bufferSize));
+ case HDFS:
+ case VIEWFS:
+ Path pt = new Path(path);
+ FileSystem fs = pt.getFileSystem(configuration);
+ FSDataOutputStream stream =
+ fs.create(pt, true, bufferSize, fs.getDefaultReplication(pt), blockSize);
+ return stream;
+ default:
+ return new DataOutputStream(
+ new BufferedOutputStream(new FileOutputStream(path), bufferSize));
+ }
+ }
+
+ /**
+ * This method checks the given path exists or not and also is it file or
+ * not if the performFileCheck is true
+ *
+ * @param filePath - Path
+ * @param fileType - FileType Local/HDFS
+ * @param performFileCheck - Provide false for folders, true for files and
+ */
+ public static boolean isFileExist(String filePath, FileType fileType, boolean performFileCheck)
+ throws IOException {
+ filePath = filePath.replace("\\", "/");
+ switch (fileType) {
+ case HDFS:
+ case VIEWFS:
+ Path path = new Path(filePath);
+ FileSystem fs = path.getFileSystem(configuration);
+ if (performFileCheck) {
+ return fs.exists(path) && fs.isFile(path);
+ } else {
+ return fs.exists(path);
+ }
+
+ case LOCAL:
+ default:
+ File defaultFile = new File(filePath);
+
+ if (performFileCheck) {
+ return defaultFile.exists() && defaultFile.isFile();
+ } else {
+ return defaultFile.exists();
+ }
+ }
+ }
+
+ /**
+ * This method checks the given path exists or not and also is it file or
+ * not if the performFileCheck is true
+ *
+ * @param filePath - Path
+ * @param fileType - FileType Local/HDFS
+ */
+ public static boolean isFileExist(String filePath, FileType fileType) throws IOException {
+ filePath = filePath.replace("\\", "/");
+ switch (fileType) {
+ case HDFS:
+ case VIEWFS:
+ Path path = new Path(filePath);
+ FileSystem fs = path.getFileSystem(configuration);
+ return fs.exists(path);
+
+ case LOCAL:
+ default:
+ File defaultFile = new File(filePath);
+ return defaultFile.exists();
+ }
+ }
+
+ public static boolean createNewFile(String filePath, FileType fileType) throws IOException {
+ filePath = filePath.replace("\\", "/");
+ switch (fileType) {
+ case HDFS:
+ case VIEWFS:
+ Path path = new Path(filePath);
+ FileSystem fs = path.getFileSystem(configuration);
+ return fs.createNewFile(path);
+
+ case LOCAL:
+ default:
+ File file = new File(filePath);
+ return file.createNewFile();
+ }
+ }
+
+ public static boolean mkdirs(String filePath, FileType fileType) throws IOException {
+ filePath = filePath.replace("\\", "/");
+ switch (fileType) {
+ case HDFS:
+ case VIEWFS:
+ Path path = new Path(filePath);
+ FileSystem fs = path.getFileSystem(configuration);
+ return fs.mkdirs(path);
+ case LOCAL:
+ default:
+ File file = new File(filePath);
+ return file.mkdirs();
+ }
+ }
+
+ /**
+ * for getting the dataoutput stream using the hdfs filesystem append API.
+ *
+ * @param path
+ * @param fileType
+ * @return
+ * @throws IOException
+ */
+ public static DataOutputStream getDataOutputStreamUsingAppend(String path, FileType fileType)
+ throws IOException {
+ path = path.replace("\\", "/");
+ switch (fileType) {
+ case LOCAL:
+ return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path, true)));
+ case HDFS:
+ case VIEWFS:
+ Path pt = new Path(path);
+ FileSystem fs = pt.getFileSystem(configuration);
+ FSDataOutputStream stream = fs.append(pt);
+ return stream;
+ default:
+ return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
+ }
+ }
+
+ /**
+ * for creating a new Lock file and if it is successfully created
+ * then in case of abrupt shutdown then the stream to that file will be closed.
+ *
+ * @param filePath
+ * @param fileType
+ * @return
+ * @throws IOException
+ */
+ public static boolean createNewLockFile(String filePath, FileType fileType) throws IOException {
+ filePath = filePath.replace("\\", "/");
+ switch (fileType) {
+ case HDFS:
+ case VIEWFS:
+ Path path = new Path(filePath);
+ FileSystem fs = path.getFileSystem(configuration);
+ if (fs.createNewFile(path)) {
+ fs.deleteOnExit(path);
+ return true;
+ }
+ return false;
+ case LOCAL:
+ default:
+ File file = new File(filePath);
+ return file.createNewFile();
+ }
+ }
+
+ public enum FileType {
+ LOCAL, HDFS, VIEWFS
+ }
+
+ /**
+ * below method will be used to update the file path
+ * for local type
+ * it removes the file:/ from the path
+ *
+ * @param filePath
+ * @return updated file path without url for local
+ */
+ public static String getUpdatedFilePath(String filePath) {
+ FileType fileType = getFileType(filePath);
+ switch (fileType) {
+ case HDFS:
+ case VIEWFS:
+ return filePath;
+ case LOCAL:
+ default:
+ Path pathWithoutSchemeAndAuthority =
+ Path.getPathWithoutSchemeAndAuthority(new Path(filePath));
+ return pathWithoutSchemeAndAuthority.toString();
+ }
+ }
+
+ /**
+ * It computes size of directory
+ *
+ * @param filePath
+ * @return size in bytes
+ * @throws IOException
+ */
+ public static long getDirectorySize(String filePath) throws IOException {
+ FileType fileType = getFileType(filePath);
+ switch (fileType) {
+ case HDFS:
+ case VIEWFS:
+ Path path = new Path(filePath);
+ FileSystem fs = path.getFileSystem(configuration);
+ return fs.getContentSummary(path).getLength();
+ case LOCAL:
+ default:
+ File file = new File(filePath);
+ return FileUtils.sizeOfDirectory(file);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileHolderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileHolderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileHolderImpl.java
new file mode 100644
index 0000000..5fefb7b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileHolderImpl.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.store.impl;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.FileHolder;
+
+public class FileHolderImpl implements FileHolder {
+ /**
+ * Attribute for Carbon LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(FileHolderImpl.class.getName());
+ /**
+ * cache to hold filename and its stream
+ */
+ private Map<String, FileChannel> fileNameAndStreamCache;
+
+ /**
+ * FileHolderImpl Constructor
+ * It will create the cache
+ */
+ public FileHolderImpl() {
+ this.fileNameAndStreamCache =
+ new HashMap<String, FileChannel>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ }
+
+ public FileHolderImpl(int capacity) {
+ this.fileNameAndStreamCache = new HashMap<String, FileChannel>(capacity);
+ }
+
+ /**
+ * This method will be used to read the byte array from file based on offset
+ * and length(number of bytes) need to read
+ *
+ * @param filePath fully qualified file path
+ * @param offset reading start position,
+ * @param length number of bytes to be read
+ * @return read byte array
+ */
+ @Override public byte[] readByteArray(String filePath, long offset, int length) {
+ FileChannel fileChannel = updateCache(filePath);
+ ByteBuffer byteBffer = read(fileChannel, length, offset);
+ return byteBffer.array();
+ }
+
+ /**
+ * This method will be used to close all the streams currently present in the cache
+ */
+ @Override public void finish() {
+
+ for (Entry<String, FileChannel> entry : fileNameAndStreamCache.entrySet()) {
+ try {
+ FileChannel channel = entry.getValue();
+ if (null != channel) {
+ channel.close();
+ }
+ } catch (IOException exception) {
+ LOGGER.error(exception, exception.getMessage());
+ }
+ }
+
+ }
+
+ /**
+ * This method will be used to read int from file from postion(offset), here
+ * length will be always 4 bacause int byte size if 4
+ *
+ * @param filePath fully qualified file path
+ * @param offset reading start position,
+ * @return read int
+ */
+ @Override public int readInt(String filePath, long offset) {
+ FileChannel fileChannel = updateCache(filePath);
+ ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.INT_SIZE_IN_BYTE, offset);
+ return byteBffer.getInt();
+ }
+
+ /**
+ * This method will be used to read int from file from postion(offset), here
+ * length will be always 4 bacause int byte size if 4
+ *
+ * @param filePath fully qualified file path
+ * @return read int
+ */
+ @Override public int readInt(String filePath) {
+ FileChannel fileChannel = updateCache(filePath);
+ ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.INT_SIZE_IN_BYTE);
+ return byteBffer.getInt();
+ }
+
+ /**
+ * This method will be used to read int from file from postion(offset), here
+ * length will be always 4 bacause int byte size if 4
+ *
+ * @param filePath fully qualified file path
+ * @param offset reading start position,
+ * @return read int
+ */
+ @Override public long readDouble(String filePath, long offset) {
+ FileChannel fileChannel = updateCache(filePath);
+ ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.LONG_SIZE_IN_BYTE, offset);
+ return byteBffer.getLong();
+ }
+
+ /**
+ * This method will be used to check whether stream is already present in
+ * cache or not for filepath if not present then create it and then add to
+ * cache, other wise get from cache
+ *
+ * @param filePath fully qualified file path
+ * @return channel
+ */
+ private FileChannel updateCache(String filePath) {
+ FileChannel fileChannel = fileNameAndStreamCache.get(filePath);
+ try {
+ if (null == fileChannel) {
+ FileInputStream stream = new FileInputStream(filePath);
+ fileChannel = stream.getChannel();
+ fileNameAndStreamCache.put(filePath, fileChannel);
+ }
+ } catch (IOException e) {
+ LOGGER.error(e, e.getMessage());
+ }
+ return fileChannel;
+ }
+
+ /**
+ * This method will be used to read from file based on number of bytes to be read and positon
+ *
+ * @param channel file channel
+ * @param size number of bytes
+ * @param offset position
+ * @return byte buffer
+ */
+ private ByteBuffer read(FileChannel channel, int size, long offset) {
+ ByteBuffer byteBffer = ByteBuffer.allocate(size);
+ try {
+ channel.position(offset);
+ channel.read(byteBffer);
+ } catch (Exception e) {
+ LOGGER.error(e, e.getMessage());
+ }
+ byteBffer.rewind();
+ return byteBffer;
+ }
+
+ /**
+ * This method will be used to read from file based on number of bytes to be read and positon
+ *
+ * @param channel file channel
+ * @param size number of bytes
+ * @return byte buffer
+ */
+ private ByteBuffer read(FileChannel channel, int size) {
+ ByteBuffer byteBffer = ByteBuffer.allocate(size);
+ try {
+ channel.read(byteBffer);
+ } catch (Exception e) {
+ LOGGER.error(e, e.getMessage());
+ }
+ byteBffer.rewind();
+ return byteBffer;
+ }
+
+
+ /**
+ * This method will be used to read the byte array from file based on length(number of bytes)
+ *
+ * @param filePath fully qualified file path
+ * @param length number of bytes to be read
+ * @return read byte array
+ */
+ @Override public byte[] readByteArray(String filePath, int length) {
+ FileChannel fileChannel = updateCache(filePath);
+ ByteBuffer byteBffer = read(fileChannel, length);
+ return byteBffer.array();
+ }
+
+ /**
+ * This method will be used to read long from file from postion(offset), here
+ * length will be always 8 bacause int byte size is 8
+ *
+ * @param filePath fully qualified file path
+ * @param offset reading start position,
+ * @return read long
+ */
+ @Override public long readLong(String filePath, long offset) {
+ FileChannel fileChannel = updateCache(filePath);
+ ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.LONG_SIZE_IN_BYTE, offset);
+ return byteBffer.getLong();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/MemoryMappedFileHolderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/MemoryMappedFileHolderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/MemoryMappedFileHolderImpl.java
new file mode 100644
index 0000000..98d0039
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/MemoryMappedFileHolderImpl.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastorage.store.impl;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.FileHolder;
+
+public class MemoryMappedFileHolderImpl implements FileHolder {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(MemoryMappedFileHolderImpl.class.getName());
+
+ private Map<String, FileChannel> fileNameAndStreamCache;
+ private Map<String, MappedByteBuffer> fileNameAndMemoryMappedFileCache;
+
+ public MemoryMappedFileHolderImpl() {
+ this(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ }
+
+ public MemoryMappedFileHolderImpl(int capacity) {
+ this.fileNameAndStreamCache = new HashMap<String, FileChannel>(capacity);
+ this.fileNameAndMemoryMappedFileCache = new HashMap<String, MappedByteBuffer>(capacity);
+ }
+
+ private MappedByteBuffer updateCache(String filePath) {
+ MappedByteBuffer byteBuffer = fileNameAndMemoryMappedFileCache.get(filePath);
+ try {
+ if (null == byteBuffer) {
+ FileChannel fileChannel = new RandomAccessFile(filePath, "r").getChannel();
+ byteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());
+ fileNameAndStreamCache.put(filePath, fileChannel);
+ fileNameAndMemoryMappedFileCache.put(filePath, byteBuffer);
+ }
+ } catch (IOException e) {
+ LOGGER.error(e, e.getMessage());
+ }
+ return byteBuffer;
+ }
+
+ @Override
+ public byte[] readByteArray(String filePath, long offset, int length) {
+ byte[] dst = new byte[length];
+ updateCache(filePath).get(dst, (int)offset, length);
+ return dst;
+ }
+
+ @Override
+ public byte[] readByteArray(String filePath, int length) {
+ byte[] dst = new byte[length];
+ updateCache(filePath).get(dst);
+ return dst;
+ }
+
+ @Override
+ public int readInt(String filePath, long offset) {
+ byte[] dst = readByteArray(filePath, offset, CarbonCommonConstants.INT_SIZE_IN_BYTE);
+ return ByteBuffer.wrap(dst).getInt();
+ }
+
+ @Override
+ public long readLong(String filePath, long offset) {
+ byte[] dst = readByteArray(filePath, offset, CarbonCommonConstants.LONG_SIZE_IN_BYTE);
+ return ByteBuffer.wrap(dst).getLong();
+ }
+
+ @Override
+ public int readInt(String filePath) {
+ return updateCache(filePath).getInt();
+ }
+
+ @Override
+ public long readDouble(String filePath, long offset) {
+ byte[] dst = readByteArray(filePath, offset, CarbonCommonConstants.LONG_SIZE_IN_BYTE);
+ return ByteBuffer.wrap(dst).getLong();
+ }
+
+ @Override
+ public void finish() {
+ fileNameAndMemoryMappedFileCache.clear();
+ for (Entry<String, FileChannel> entry : fileNameAndStreamCache.entrySet()) {
+ try {
+ FileChannel channel = entry.getValue();
+ if (null != channel) {
+ channel.close();
+ }
+ } catch (IOException exception) {
+ LOGGER.error(exception, exception.getMessage());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
new file mode 100644
index 0000000..21c2a60
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.store.impl.data.compressed;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.NodeMeasureDataStore;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
+import org.apache.carbondata.core.datastorage.store.dataholder.CarbonWriteDataHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+
+public abstract class AbstractHeavyCompressedDoubleArrayDataStore
+ implements NodeMeasureDataStore //NodeMeasureDataStore<double[]>
+{
+
+ /**
+ * values.
+ */
+ protected ValueCompressonHolder.UnCompressValue[] values;
+
+ /**
+ * compressionModel.
+ */
+ protected ValueCompressionModel compressionModel;
+
+ /**
+ * type
+ */
+ private char[] type;
+
+ /**
+ * AbstractHeavyCompressedDoubleArrayDataStore constructor.
+ *
+ * @param compressionModel
+ */
+ public AbstractHeavyCompressedDoubleArrayDataStore(ValueCompressionModel compressionModel) {
+ this.compressionModel = compressionModel;
+ if (null != compressionModel) {
+ this.type = compressionModel.getType();
+ values =
+ new ValueCompressonHolder.UnCompressValue[compressionModel.getUnCompressValues().length];
+ }
+ }
+
+ @Override public byte[][] getWritableMeasureDataArray(CarbonWriteDataHolder[] dataHolder) {
+ for (int i = 0; i < compressionModel.getUnCompressValues().length; i++) {
+ values[i] = compressionModel.getUnCompressValues()[i].getNew();
+ if (type[i] != CarbonCommonConstants.BYTE_VALUE_MEASURE
+ && type[i] != CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+ if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
+ values[i].setValue(ValueCompressionUtil
+ .getCompressedValues(compressionModel.getCompType()[i],
+ dataHolder[i].getWritableLongValues(), compressionModel.getChangedDataType()[i],
+ (long) compressionModel.getMaxValue()[i], compressionModel.getDecimal()[i]));
+ } else {
+ values[i].setValue(ValueCompressionUtil
+ .getCompressedValues(compressionModel.getCompType()[i],
+ dataHolder[i].getWritableDoubleValues(), compressionModel.getChangedDataType()[i],
+ (double) compressionModel.getMaxValue()[i], compressionModel.getDecimal()[i]));
+ }
+ } else {
+ values[i].setValue(dataHolder[i].getWritableByteArrayValues());
+ }
+ values[i] = values[i].compress();
+ }
+ byte[][] returnValue = new byte[values.length][];
+ for (int i = 0; i < values.length; i++) {
+ returnValue[i] = values[i].getBackArrayData();
+ }
+ return returnValue;
+ }
+
+ @Override public short getLength() {
+ return values != null ? (short) values.length : 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/HeavyCompressedDoubleArrayDataFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/HeavyCompressedDoubleArrayDataFileStore.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/HeavyCompressedDoubleArrayDataFileStore.java
new file mode 100644
index 0000000..2fd873b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/HeavyCompressedDoubleArrayDataFileStore.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.store.impl.data.compressed;
+
+import org.apache.carbondata.core.datastorage.store.FileHolder;
+import org.apache.carbondata.core.datastorage.store.MeasureDataWrapper;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
+import org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
+import org.apache.carbondata.core.datastorage.store.impl.CompressedDataMeasureDataWrapper;
+
+public class HeavyCompressedDoubleArrayDataFileStore
+ extends AbstractHeavyCompressedDoubleArrayDataStore {
+ /**
+ * measuresOffsetsArray.
+ */
+ private long[] measuresOffsetsArray;
+
+ /**
+ * measuresLengthArray.
+ */
+ private int[] measuresLengthArray;
+
+ /**
+ * fileName.
+ */
+ private String fileName;
+
+ /**
+ * HeavyCompressedDoubleArrayDataFileStore.
+ *
+ * @param compressionModel
+ * @param measuresOffsetsArray
+ * @param measuresLengthArray
+ * @param fileName
+ */
+ public HeavyCompressedDoubleArrayDataFileStore(ValueCompressionModel compressionModel,
+ long[] measuresOffsetsArray, int[] measuresLengthArray, String fileName) {
+ super(compressionModel);
+ if (null != compressionModel) {
+ this.fileName = fileName;
+ this.measuresLengthArray = measuresLengthArray;
+ this.measuresOffsetsArray = measuresOffsetsArray;
+ for (int i = 0; i < values.length; i++) {
+ values[i] = compressionModel.getUnCompressValues()[i].getNew().getCompressorObject();
+ }
+ }
+ }
+
+ @Override public MeasureDataWrapper getBackData(int[] cols, FileHolder fileHolder) {
+ if (null == compressionModel) {
+ return null;
+ }
+ CarbonReadDataHolder[] vals = new CarbonReadDataHolder[values.length];
+
+ if (cols != null) {
+ for (int i = 0; i < cols.length; i++) {
+ ValueCompressonHolder.UnCompressValue copy = values[cols[i]].getNew();
+ copy.setValue(fileHolder
+ .readByteArray(fileName, measuresOffsetsArray[cols[i]], measuresLengthArray[cols[i]]));
+ vals[cols[i]] = copy.uncompress(compressionModel.getChangedDataType()[cols[i]])
+ .getValues(compressionModel.getDecimal()[cols[i]],
+ compressionModel.getMaxValue()[cols[i]]);
+ copy = null;
+ }
+ } else {
+ for (int j = 0; j < vals.length; j++) {
+ ValueCompressonHolder.UnCompressValue copy = values[j].getNew();
+ copy.setValue(
+ fileHolder.readByteArray(fileName, measuresOffsetsArray[j], measuresLengthArray[j]));
+ vals[j] = copy.uncompress(compressionModel.getChangedDataType()[j])
+ .getValues(compressionModel.getDecimal()[j], compressionModel.getMaxValue()[j]);
+ copy = null;
+ }
+ }
+ return new CompressedDataMeasureDataWrapper(vals);
+
+ }
+
+ @Override public MeasureDataWrapper getBackData(int cols, FileHolder fileHolder) {
+ if (null == compressionModel) {
+ return null;
+ }
+ CarbonReadDataHolder[] vals = new CarbonReadDataHolder[values.length];
+ ValueCompressonHolder.UnCompressValue copy = values[cols].getNew();
+ copy.setValue(
+ fileHolder.readByteArray(fileName, measuresOffsetsArray[cols], measuresLengthArray[cols]));
+ vals[cols] = copy.uncompress(compressionModel.getChangedDataType()[cols])
+ .getValues(compressionModel.getDecimal()[cols], compressionModel.getMaxValue()[cols]);
+ return new CompressedDataMeasureDataWrapper(vals);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java
new file mode 100644
index 0000000..f726ba7
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.store.impl.data.compressed;
+
+import org.apache.carbondata.core.datastorage.store.FileHolder;
+import org.apache.carbondata.core.datastorage.store.MeasureDataWrapper;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
+import org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
+import org.apache.carbondata.core.datastorage.store.impl.CompressedDataMeasureDataWrapper;
+
+public class HeavyCompressedDoubleArrayDataInMemoryStore
+ extends AbstractHeavyCompressedDoubleArrayDataStore {
+
+ public HeavyCompressedDoubleArrayDataInMemoryStore(ValueCompressionModel compressionModel,
+ long[] measuresOffsetsArray, int[] measuresLengthArray, String fileName,
+ FileHolder fileHolder) {
+ super(compressionModel);
+ for (int i = 0; i < measuresLengthArray.length; i++) {
+ values[i] = compressionModel.getUnCompressValues()[i].getCompressorObject();
+ values[i].setValue(
+ fileHolder.readByteArray(fileName, measuresOffsetsArray[i], measuresLengthArray[i]));
+ }
+ }
+
+ public HeavyCompressedDoubleArrayDataInMemoryStore(ValueCompressionModel compressionModel) {
+ super(compressionModel);
+ }
+
+ @Override public MeasureDataWrapper getBackData(int[] cols, FileHolder fileHolder) {
+ if (null == compressionModel) {
+ return null;
+ }
+ CarbonReadDataHolder[] vals = new CarbonReadDataHolder[values.length];
+ if (cols != null) {
+ for (int i = 0; i < cols.length; i++) {
+ vals[cols[i]] = values[cols[i]].uncompress(compressionModel.getChangedDataType()[cols[i]])
+ .getValues(compressionModel.getDecimal()[cols[i]],
+ compressionModel.getMaxValue()[cols[i]]);
+ }
+ } else {
+ for (int i = 0; i < vals.length; i++) {
+
+ vals[i] = values[i].uncompress(compressionModel.getChangedDataType()[i])
+ .getValues(compressionModel.getDecimal()[i], compressionModel.getMaxValue()[i]);
+ }
+ }
+ return new CompressedDataMeasureDataWrapper(vals);
+ }
+
+ @Override public MeasureDataWrapper getBackData(int cols, FileHolder fileHolder) {
+ if (null == compressionModel) {
+ return null;
+ }
+ CarbonReadDataHolder[] vals = new CarbonReadDataHolder[values.length];
+ vals[cols] = values[cols].uncompress(compressionModel.getChangedDataType()[cols])
+ .getValues(compressionModel.getDecimal()[cols], compressionModel.getMaxValue()[cols]);
+ return new CompressedDataMeasureDataWrapper(vals);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/uncompressed/AbstractDoubleArrayDataStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/uncompressed/AbstractDoubleArrayDataStore.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/uncompressed/AbstractDoubleArrayDataStore.java
new file mode 100644
index 0000000..8271e43
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/uncompressed/AbstractDoubleArrayDataStore.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.store.impl.data.uncompressed;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.NodeMeasureDataStore;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
+import org.apache.carbondata.core.datastorage.store.dataholder.CarbonWriteDataHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+
+public abstract class AbstractDoubleArrayDataStore implements NodeMeasureDataStore {
+
+ protected ValueCompressonHolder.UnCompressValue[] values;
+
+ protected ValueCompressionModel compressionModel;
+
+ private char[] type;
+
+ public AbstractDoubleArrayDataStore(ValueCompressionModel compressionModel) {
+ this.compressionModel = compressionModel;
+ if (null != compressionModel) {
+ values =
+ new ValueCompressonHolder.UnCompressValue[compressionModel.getUnCompressValues().length];
+ type = compressionModel.getType();
+ }
+ }
+
+ @Override public byte[][] getWritableMeasureDataArray(CarbonWriteDataHolder[] dataHolder) {
+ values =
+ new ValueCompressonHolder.UnCompressValue[compressionModel.getUnCompressValues().length];
+ for (int i = 0; i < compressionModel.getUnCompressValues().length; i++) {
+ values[i] = compressionModel.getUnCompressValues()[i].getNew();
+ if (type[i] != CarbonCommonConstants.BYTE_VALUE_MEASURE
+ && type[i] != CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+ if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
+ values[i].setValue(ValueCompressionUtil
+ .getCompressedValues(compressionModel.getCompType()[i],
+ dataHolder[i].getWritableLongValues(), compressionModel.getChangedDataType()[i],
+ (long) compressionModel.getMaxValue()[i], compressionModel.getDecimal()[i]));
+ } else {
+ values[i].setValue(ValueCompressionUtil
+ .getCompressedValues(compressionModel.getCompType()[i],
+ dataHolder[i].getWritableDoubleValues(), compressionModel.getChangedDataType()[i],
+ (double) compressionModel.getMaxValue()[i], compressionModel.getDecimal()[i]));
+ }
+ } else {
+ values[i].setValue(dataHolder[i].getWritableByteArrayValues());
+ }
+ }
+
+ byte[][] resturnValue = new byte[values.length][];
+
+ for (int i = 0; i < values.length; i++) {
+ resturnValue[i] = values[i].getBackArrayData();
+ }
+ return resturnValue;
+ }
+
+ @Override public short getLength() {
+ return values != null ? (short) values.length : 0;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/uncompressed/DoubleArrayDataFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/uncompressed/DoubleArrayDataFileStore.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/uncompressed/DoubleArrayDataFileStore.java
new file mode 100644
index 0000000..182b868
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/uncompressed/DoubleArrayDataFileStore.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.store.impl.data.uncompressed;
+
+import org.apache.carbondata.core.datastorage.store.FileHolder;
+import org.apache.carbondata.core.datastorage.store.MeasureDataWrapper;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
+import org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
+import org.apache.carbondata.core.datastorage.store.impl.CompressedDataMeasureDataWrapper;
+
+public class DoubleArrayDataFileStore extends AbstractDoubleArrayDataStore {
+
+ private long[] measuresOffsetsArray;
+
+ private int[] measuresLengthArray;
+
+ private String fileName;
+
+ public DoubleArrayDataFileStore(ValueCompressionModel compressionModel,
+ long[] measuresOffsetsArray, String fileName, int[] measuresLengthArray) {
+ super(compressionModel);
+ this.fileName = fileName;
+ this.measuresLengthArray = measuresLengthArray;
+ this.measuresOffsetsArray = measuresOffsetsArray;
+ }
+
+ @Override public MeasureDataWrapper getBackData(int[] cols, FileHolder fileHolder) {
+ if (null == compressionModel) {
+ return null;
+ }
+ UnCompressValue[] unComp = new UnCompressValue[measuresLengthArray.length];
+ CarbonReadDataHolder[] vals = new CarbonReadDataHolder[measuresLengthArray.length];
+ if (cols != null) {
+ for (int i = 0; i < cols.length; i++) {
+ unComp[cols[i]] = compressionModel.getUnCompressValues()[cols[i]].getNew();
+ unComp[cols[i]].setValueInBytes(fileHolder
+ .readByteArray(fileName, measuresOffsetsArray[cols[i]], measuresLengthArray[cols[i]]));
+ vals[cols[i]] = unComp[cols[i]].getValues(compressionModel.getDecimal()[cols[i]],
+ compressionModel.getMaxValue()[cols[i]]);
+ }
+ } else {
+ for (int i = 0; i < unComp.length; i++) {
+
+ unComp[i] = compressionModel.getUnCompressValues()[i].getNew();
+ unComp[i].setValueInBytes(
+ fileHolder.readByteArray(fileName, measuresOffsetsArray[i], measuresLengthArray[i]));
+ vals[i] = unComp[i]
+ .getValues(compressionModel.getDecimal()[i], compressionModel.getMaxValue()[i]);
+ }
+ }
+ return new CompressedDataMeasureDataWrapper(vals);
+ }
+
+ @Override public MeasureDataWrapper getBackData(int cols, FileHolder fileHolder) {
+ if (null == compressionModel) {
+ return null;
+ }
+ UnCompressValue[] unComp = new UnCompressValue[measuresLengthArray.length];
+ CarbonReadDataHolder[] vals = new CarbonReadDataHolder[measuresLengthArray.length];
+
+ unComp[cols] = compressionModel.getUnCompressValues()[cols].getNew();
+ unComp[cols].setValueInBytes(
+ fileHolder.readByteArray(fileName, measuresOffsetsArray[cols], measuresLengthArray[cols]));
+ vals[cols] = unComp[cols]
+ .getValues(compressionModel.getDecimal()[cols], compressionModel.getMaxValue()[cols]);
+ return new CompressedDataMeasureDataWrapper(vals);
+ }
+}