You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/08/15 07:09:01 UTC
[16/52] [partial] incubator-carbondata git commit: Renamed packages
to org.apache.carbondata and fixed errors
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/datastorage/store/compression/type/UnCompressNoneLong.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/compression/type/UnCompressNoneLong.java b/core/src/main/java/org/carbondata/core/datastorage/store/compression/type/UnCompressNoneLong.java
deleted file mode 100644
index d4b3ce5..0000000
--- a/core/src/main/java/org/carbondata/core/datastorage/store/compression/type/UnCompressNoneLong.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.datastorage.store.compression.type;
-
-import java.nio.ByteBuffer;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.datastorage.store.compression.Compressor;
-import org.carbondata.core.datastorage.store.compression.SnappyCompression;
-import org.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import org.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
-import org.carbondata.core.util.ValueCompressionUtil;
-
-public class UnCompressNoneLong implements ValueCompressonHolder.UnCompressValue<long[]> {
- /**
- * Attribute for Carbon LOGGER
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(UnCompressNoneLong.class.getName());
- /**
- * longCompressor.
- */
- private static Compressor<long[]> longCompressor =
- SnappyCompression.SnappyLongCompression.INSTANCE;
- /**
- * value.
- */
- protected long[] value;
-
- @Override public void setValue(long[] value) {
- this.value = value;
-
- }
-
- @Override public ValueCompressonHolder.UnCompressValue getNew() {
- try {
- return (ValueCompressonHolder.UnCompressValue) clone();
- } catch (CloneNotSupportedException clnNotSupportedExc) {
- LOGGER.error(clnNotSupportedExc,
- clnNotSupportedExc.getMessage());
- }
- return null;
- }
-
- @Override public ValueCompressonHolder.UnCompressValue compress() {
- UnCompressNoneByte byte1 = new UnCompressNoneByte();
- byte1.setValue(longCompressor.compress(value));
- return byte1;
-
- }
-
- @Override
- public ValueCompressonHolder.UnCompressValue uncompress(ValueCompressionUtil.DataType dType) {
- return null;
- }
-
- @Override public byte[] getBackArrayData() {
- return ValueCompressionUtil.convertToBytes(value);
- }
-
- @Override public void setValueInBytes(byte[] byteValue) {
- ByteBuffer buffer = ByteBuffer.wrap(byteValue);
- this.value = ValueCompressionUtil.convertToLongArray(buffer, byteValue.length);
- }
-
- /**
- * @see ValueCompressonHolder.UnCompressValue#getCompressorObject()
- */
- @Override public ValueCompressonHolder.UnCompressValue getCompressorObject() {
- return new UnCompressNoneByte();
- }
-
- @Override public CarbonReadDataHolder getValues(int decimal, Object maxValueObject) {
- CarbonReadDataHolder dataHolder = new CarbonReadDataHolder();
- double[] vals = new double[value.length];
- for (int i = 0; i < vals.length; i++) {
- vals[i] = value[i];
- }
- dataHolder.setReadableDoubleValues(vals);
- return dataHolder;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/datastorage/store/compression/type/UnCompressNoneShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/compression/type/UnCompressNoneShort.java b/core/src/main/java/org/carbondata/core/datastorage/store/compression/type/UnCompressNoneShort.java
deleted file mode 100644
index d0b6cc2..0000000
--- a/core/src/main/java/org/carbondata/core/datastorage/store/compression/type/UnCompressNoneShort.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.datastorage.store.compression.type;
-
-import java.nio.ByteBuffer;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.datastorage.store.compression.Compressor;
-import org.carbondata.core.datastorage.store.compression.SnappyCompression;
-import org.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import org.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
-import org.carbondata.core.util.ValueCompressionUtil;
-import org.carbondata.core.util.ValueCompressionUtil.DataType;
-
-public class UnCompressNoneShort implements ValueCompressonHolder.UnCompressValue<short[]> {
- /**
- * Attribute for Carbon LOGGER
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(UnCompressNoneShort.class.getName());
-
- /**
- * shortCompressor.
- */
- private static Compressor<short[]> shortCompressor =
- SnappyCompression.SnappyShortCompression.INSTANCE;
-
- /**
- * value.
- */
- private short[] shortValue;
-
- @Override public void setValue(short[] shortValue) {
- this.shortValue = shortValue;
-
- }
-
- @Override public ValueCompressonHolder.UnCompressValue getNew() {
- try {
- return (ValueCompressonHolder.UnCompressValue) clone();
- } catch (CloneNotSupportedException cns1) {
- LOGGER.error(cns1, cns1.getMessage());
- }
- return null;
- }
-
- @Override public ValueCompressonHolder.UnCompressValue compress() {
-
- UnCompressNoneByte byte1 = new UnCompressNoneByte();
- byte1.setValue(shortCompressor.compress(shortValue));
-
- return byte1;
-
- }
-
- @Override public ValueCompressonHolder.UnCompressValue uncompress(DataType dataType) {
- return null;
- }
-
- @Override public byte[] getBackArrayData() {
- return ValueCompressionUtil.convertToBytes(shortValue);
- }
-
- @Override public void setValueInBytes(byte[] value) {
- ByteBuffer buffer = ByteBuffer.wrap(value);
- shortValue = ValueCompressionUtil.convertToShortArray(buffer, value.length);
- }
-
- /**
- * @see ValueCompressonHolder.UnCompressValue#getCompressorObject()
- */
- @Override public ValueCompressonHolder.UnCompressValue getCompressorObject() {
- return new UnCompressNoneByte();
- }
-
- @Override public CarbonReadDataHolder getValues(int decimal, Object maxValueObject) {
- CarbonReadDataHolder dataHolder = new CarbonReadDataHolder();
- double[] vals = new double[shortValue.length];
- for (int i = 0; i < vals.length; i++) {
- vals[i] = shortValue[i];
- }
- dataHolder.setReadableDoubleValues(vals);
- return dataHolder;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/datastorage/store/dataholder/CarbonReadDataHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/dataholder/CarbonReadDataHolder.java b/core/src/main/java/org/carbondata/core/datastorage/store/dataholder/CarbonReadDataHolder.java
deleted file mode 100644
index a297b0b..0000000
--- a/core/src/main/java/org/carbondata/core/datastorage/store/dataholder/CarbonReadDataHolder.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.datastorage.store.dataholder;
-
-import java.math.BigDecimal;
-
-public class CarbonReadDataHolder {
-
- /**
- * doubleValues
- */
- private double[] doubleValues;
-
- /**
- * longValues
- */
- private long[] longValues;
-
- /**
- * bigDecimalValues
- */
- private BigDecimal[] bigDecimalValues;
-
- /**
- * byteValues
- */
- private byte[][] byteValues;
-
- /**
- * @return the doubleValues
- */
- public double[] getReadableDoubleValues() {
- return doubleValues;
- }
-
- /**
- * @param doubleValues the doubleValues to set
- */
- public void setReadableDoubleValues(double[] doubleValues) {
- this.doubleValues = doubleValues;
- }
-
- /**
- * @return the byteValues
- */
- public byte[][] getReadableByteArrayValues() {
- return byteValues;
- }
-
- /**
- * @param longValues the longValues to set
- */
- public void setReadableLongValues(long[] longValues) {
- this.longValues = longValues;
- }
-
- /**
- * @param longValues the bigDecimalValues to set
- */
- public void setReadableBigDecimalValues(BigDecimal[] bigDecimalValues) {
- this.bigDecimalValues = bigDecimalValues;
- }
-
- /**
- * @param byteValues the byteValues to set
- */
- public void setReadableByteValues(byte[][] byteValues) {
- this.byteValues = byteValues;
- }
-
- /**
- * below method will be used to get the double value by index
- *
- * @param index
- * @return double values
- */
- public double getReadableDoubleValueByIndex(int index) {
- return this.doubleValues[index];
- }
-
- public long getReadableLongValueByIndex(int index) {
- return this.longValues[index];
- }
-
- public BigDecimal getReadableBigDecimalValueByIndex(int index) {
- return this.bigDecimalValues[index];
- }
-
- /**
- * below method will be used to get the readable byte array value by index
- *
- * @param index
- * @return byte array value
- */
- public byte[] getReadableByteArrayValueByIndex(int index) {
- return this.byteValues[index];
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/datastorage/store/dataholder/CarbonWriteDataHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/dataholder/CarbonWriteDataHolder.java b/core/src/main/java/org/carbondata/core/datastorage/store/dataholder/CarbonWriteDataHolder.java
deleted file mode 100644
index 0cc974c..0000000
--- a/core/src/main/java/org/carbondata/core/datastorage/store/dataholder/CarbonWriteDataHolder.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.datastorage.store.dataholder;
-
-public class CarbonWriteDataHolder {
- /**
- * doubleValues
- */
- private double[] doubleValues;
-
- /**
- * longValues
- */
- private long[] longValues;
-
- /**
- * byteValues
- */
- private byte[][] byteValues;
-
- /**
- * byteValues
- */
- private byte[][][] columnByteValues;
-
- /**
- * size
- */
- private int size;
-
- /**
- * totalSize
- */
- private int totalSize;
-
- /**
- * Method to initialise double array
- *
- * @param size
- */
- public void initialiseDoubleValues(int size) {
- if (size < 1) {
- throw new IllegalArgumentException("Invalid array size");
- }
- doubleValues = new double[size];
- }
-
- public void reset() {
- size = 0;
- totalSize = 0;
- }
-
- /**
- * Method to initialise double array
- *
- * @param size
- */
- public void initialiseByteArrayValues(int size) {
- if (size < 1) {
- throw new IllegalArgumentException("Invalid array size");
- }
-
- byteValues = new byte[size][];
- columnByteValues = new byte[size][][];
- }
-
- /**
- * Method to initialise long array
- *
- * @param size
- */
- public void initialiseLongValues(int size) {
- if (size < 1) {
- throw new IllegalArgumentException("Invalid array size");
- }
- longValues = new long[size];
- }
-
- /**
- * set double value by index
- *
- * @param index
- * @param value
- */
- public void setWritableDoubleValueByIndex(int index, Object value) {
- doubleValues[index] = (Double) value;
- size++;
- }
-
- /**
- * set double value by index
- *
- * @param index
- * @param value
- */
- public void setWritableLongValueByIndex(int index, Object value) {
- longValues[index] = (Long) value;
- size++;
- }
-
- /**
- * set byte array value by index
- *
- * @param index
- * @param value
- */
- public void setWritableByteArrayValueByIndex(int index, byte[] value) {
- byteValues[index] = value;
- size++;
- if (null != value) totalSize += value.length;
- }
-
- /**
- * set byte array value by index
- */
- public void setWritableByteArrayValueByIndex(int index, int mdKeyIndex, Object[] columnData) {
- int l = 0;
- columnByteValues[index] = new byte[columnData.length - (mdKeyIndex + 1)][];
- for (int i = mdKeyIndex + 1; i < columnData.length; i++) {
- columnByteValues[index][l++] = (byte[]) columnData[i];
- }
- }
-
- /**
- * Get Writable Double Values
- */
- public double[] getWritableDoubleValues() {
- if (size < doubleValues.length) {
- double[] temp = new double[size];
- System.arraycopy(doubleValues, 0, temp, 0, size);
- doubleValues = temp;
- }
- return doubleValues;
- }
-
- /**
- * Get writable byte array values
- */
- public byte[] getWritableByteArrayValues() {
- byte[] temp = new byte[totalSize];
- int startIndexToCopy = 0;
- for (int i = 0; i < size; i++) {
- System.arraycopy(byteValues[i], 0, temp, startIndexToCopy, byteValues[i].length);
- startIndexToCopy += byteValues[i].length;
- }
- return temp;
- }
-
- public byte[][] getByteArrayValues() {
- if (size < byteValues.length) {
- byte[][] temp = new byte[size][];
- System.arraycopy(byteValues, 0, temp, 0, size);
- byteValues = temp;
- }
- return byteValues;
- }
-
- /**
- * Get Writable Double Values
- *
- * @return
- */
- public long[] getWritableLongValues() {
- if (size < longValues.length) {
- long[] temp = new long[size];
- System.arraycopy(longValues, 0, temp, 0, size);
- longValues = temp;
- }
- return longValues;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/AbstractDFSCarbonFile.java
deleted file mode 100644
index b04cd47..0000000
--- a/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/AbstractDFSCarbonFile.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.datastorage.store.filesystem;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.datastorage.store.impl.FileFactory;
-import org.carbondata.core.util.CarbonUtil;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public abstract class AbstractDFSCarbonFile implements CarbonFile {
- /**
- * LOGGER
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(AbstractDFSCarbonFile.class.getName());
- protected FileStatus fileStatus;
- protected FileSystem fs;
-
- public AbstractDFSCarbonFile(String filePath) {
- filePath = filePath.replace("\\", "/");
- Path path = new Path(filePath);
- try {
- fs = path.getFileSystem(FileFactory.getConfiguration());
- fileStatus = fs.getFileStatus(path);
- } catch (IOException e) {
- LOGGER.error("Exception occured:" + e.getMessage());
- }
- }
-
- public AbstractDFSCarbonFile(Path path) {
- try {
- fs = path.getFileSystem(FileFactory.getConfiguration());
- fileStatus = fs.getFileStatus(path);
- } catch (IOException e) {
- LOGGER.error("Exception occured:" + e.getMessage());
- }
- }
-
- public AbstractDFSCarbonFile(FileStatus fileStatus) {
- this.fileStatus = fileStatus;
- }
-
- @Override public boolean createNewFile() {
- Path path = fileStatus.getPath();
- try {
- return fs.createNewFile(path);
- } catch (IOException e) {
- return false;
- }
- }
-
- @Override public String getAbsolutePath() {
- return fileStatus.getPath().toString();
- }
-
- @Override public String getName() {
- return fileStatus.getPath().getName();
- }
-
- @Override public boolean isDirectory() {
- return fileStatus.isDirectory();
- }
-
- @Override public boolean exists() {
- try {
- if (null != fileStatus) {
- fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
- return fs.exists(fileStatus.getPath());
- }
- } catch (IOException e) {
- LOGGER.error("Exception occured:" + e.getMessage());
- }
- return false;
- }
-
- @Override public String getCanonicalPath() {
- return getAbsolutePath();
- }
-
- @Override public String getPath() {
- return getAbsolutePath();
- }
-
- @Override public long getSize() {
- return fileStatus.getLen();
- }
-
- public boolean renameTo(String changetoName) {
- FileSystem fs;
- try {
- fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
- return fs.rename(fileStatus.getPath(), new Path(changetoName));
- } catch (IOException e) {
- LOGGER.error("Exception occured:" + e.getMessage());
- return false;
- }
- }
-
- public boolean delete() {
- FileSystem fs;
- try {
- fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
- return fs.delete(fileStatus.getPath(), true);
- } catch (IOException e) {
- LOGGER.error("Exception occured:" + e.getMessage());
- return false;
- }
- }
-
- @Override public long getLastModifiedTime() {
- return fileStatus.getModificationTime();
- }
-
- @Override public boolean setLastModifiedTime(long timestamp) {
- try {
- fs.setTimes(fileStatus.getPath(), timestamp, timestamp);
- } catch (IOException e) {
- return false;
- }
- return true;
- }
-
- /**
- * This method will delete the data in file data from a given offset
- */
- @Override public boolean truncate(String fileName, long validDataEndOffset) {
- DataOutputStream dataOutputStream = null;
- DataInputStream dataInputStream = null;
- boolean fileTruncatedSuccessfully = false;
- // if bytes to read less than 1024 then buffer size should be equal to the given offset
- int bufferSize = validDataEndOffset > CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR ?
- CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR :
- (int) validDataEndOffset;
- // temporary file name
- String tempWriteFilePath = fileName + CarbonCommonConstants.TEMPWRITEFILEEXTENSION;
- FileFactory.FileType fileType = FileFactory.getFileType(fileName);
- try {
- CarbonFile tempFile = null;
- // delete temporary file if it already exists at a given path
- if (FileFactory.isFileExist(tempWriteFilePath, fileType)) {
- tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
- tempFile.delete();
- }
- // create new temporary file
- FileFactory.createNewFile(tempWriteFilePath, fileType);
- tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
- byte[] buff = new byte[bufferSize];
- dataInputStream = FileFactory.getDataInputStream(fileName, fileType);
- // read the data
- int read = dataInputStream.read(buff, 0, buff.length);
- dataOutputStream = FileFactory.getDataOutputStream(tempWriteFilePath, fileType);
- dataOutputStream.write(buff, 0, read);
- long remaining = validDataEndOffset - read;
- // anytime we should not cross the offset to be read
- while (remaining > 0) {
- if (remaining > bufferSize) {
- buff = new byte[bufferSize];
- } else {
- buff = new byte[(int) remaining];
- }
- read = dataInputStream.read(buff, 0, buff.length);
- dataOutputStream.write(buff, 0, read);
- remaining = remaining - read;
- }
- CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
- // rename the temp file to original file
- tempFile.renameForce(fileName);
- fileTruncatedSuccessfully = true;
- } catch (IOException e) {
- LOGGER.error("Exception occured while truncating the file " + e.getMessage());
- } finally {
- CarbonUtil.closeStreams(dataOutputStream, dataInputStream);
- }
- return fileTruncatedSuccessfully;
- }
-
- /**
- * This method will be used to check whether a file has been modified or not
- *
- * @param fileTimeStamp time to be compared with latest timestamp of file
- * @param endOffset file length to be compared with current length of file
- * @return
- */
- @Override public boolean isFileModified(long fileTimeStamp, long endOffset) {
- boolean isFileModified = false;
- if (getLastModifiedTime() > fileTimeStamp || getSize() > endOffset) {
- isFileModified = true;
- }
- return isFileModified;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/CarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/CarbonFile.java b/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/CarbonFile.java
deleted file mode 100644
index 8fab676..0000000
--- a/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/CarbonFile.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.datastorage.store.filesystem;
-
-public interface CarbonFile {
-
- String getAbsolutePath();
-
- CarbonFile[] listFiles(CarbonFileFilter fileFilter);
-
- CarbonFile[] listFiles();
-
- String getName();
-
- boolean isDirectory();
-
- boolean exists();
-
- String getCanonicalPath();
-
- CarbonFile getParentFile();
-
- String getPath();
-
- long getSize();
-
- boolean renameTo(String changetoName);
-
- boolean renameForce(String changetoName);
-
- boolean delete();
-
- boolean createNewFile();
-
- long getLastModifiedTime();
-
- boolean setLastModifiedTime(long timestamp);
-
- boolean truncate(String fileName, long validDataEndOffset);
-
- /**
- * This method will be used to check whether a file has been modified or not
- *
- * @param fileTimeStamp time to be compared with latest timestamp of file
- * @param endOffset file length to be compared with current length of file
- * @return
- */
- boolean isFileModified(long fileTimeStamp, long endOffset);
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/CarbonFileFilter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/CarbonFileFilter.java b/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/CarbonFileFilter.java
deleted file mode 100644
index 4dbfbd3..0000000
--- a/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/CarbonFileFilter.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.datastorage.store.filesystem;
-
-public interface CarbonFileFilter {
- boolean accept(CarbonFile file);
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java b/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java
deleted file mode 100644
index 2df5a82..0000000
--- a/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.datastorage.store.filesystem;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.datastorage.store.impl.FileFactory;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-
-public class HDFSCarbonFile extends AbstractDFSCarbonFile {
- /**
- * LOGGER
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(HDFSCarbonFile.class.getName());
-
- public HDFSCarbonFile(String filePath) {
- super(filePath);
- }
-
- public HDFSCarbonFile(Path path) {
- super(path);
- }
-
- public HDFSCarbonFile(FileStatus fileStatus) {
- super(fileStatus);
- }
-
- /**
- * @param listStatus
- * @return
- */
- private CarbonFile[] getFiles(FileStatus[] listStatus) {
- if (listStatus == null) {
- return new CarbonFile[0];
- }
- CarbonFile[] files = new CarbonFile[listStatus.length];
- for (int i = 0; i < files.length; i++) {
- files[i] = new HDFSCarbonFile(listStatus[i]);
- }
- return files;
- }
-
- @Override
- public CarbonFile[] listFiles() {
- FileStatus[] listStatus = null;
- try {
- if (null != fileStatus && fileStatus.isDirectory()) {
- Path path = fileStatus.getPath();
- listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path);
- } else {
- return null;
- }
- } catch (IOException e) {
- LOGGER.error("Exception occured: " + e.getMessage());
- return new CarbonFile[0];
- }
- return getFiles(listStatus);
- }
-
- @Override
- public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
- CarbonFile[] files = listFiles();
- if (files != null && files.length >= 1) {
- List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length);
- for (int i = 0; i < files.length; i++) {
- if (fileFilter.accept(files[i])) {
- fileList.add(files[i]);
- }
- }
- if (fileList.size() >= 1) {
- return fileList.toArray(new CarbonFile[fileList.size()]);
- } else {
- return new CarbonFile[0];
- }
- }
- return files;
- }
-
- @Override
- public CarbonFile getParentFile() {
- Path parent = fileStatus.getPath().getParent();
- return null == parent ? null : new HDFSCarbonFile(parent);
- }
-
- @Override
- public boolean renameForce(String changetoName) {
- FileSystem fs;
- try {
- fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
- if (fs instanceof DistributedFileSystem) {
- ((DistributedFileSystem) fs).rename(fileStatus.getPath(), new Path(changetoName),
- org.apache.hadoop.fs.Options.Rename.OVERWRITE);
- return true;
- } else {
- return false;
- }
- } catch (IOException e) {
- LOGGER.error("Exception occured: " + e.getMessage());
- return false;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/LocalCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/LocalCarbonFile.java b/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/LocalCarbonFile.java
deleted file mode 100644
index 55f20de..0000000
--- a/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/LocalCarbonFile.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.datastorage.store.filesystem;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.datastorage.store.impl.FileFactory;
-import org.carbondata.core.util.CarbonUtil;
-
-import org.apache.hadoop.fs.Path;
-
-public class LocalCarbonFile implements CarbonFile {
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(LocalCarbonFile.class.getName());
- private File file;
-
- public LocalCarbonFile(String filePath) {
- Path pathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(new Path(filePath));
- file = new File(pathWithoutSchemeAndAuthority.toString());
- }
-
- public LocalCarbonFile(File file) {
- this.file = file;
- }
-
- @Override public String getAbsolutePath() {
- return file.getAbsolutePath();
- }
-
- @Override public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
- if (!file.isDirectory()) {
- return null;
- }
-
- File[] files = file.listFiles(new FileFilter() {
-
- @Override public boolean accept(File pathname) {
- return fileFilter.accept(new LocalCarbonFile(pathname));
- }
- });
-
- if (files == null) {
- return new CarbonFile[0];
- }
-
- CarbonFile[] carbonFiles = new CarbonFile[files.length];
-
- for (int i = 0; i < carbonFiles.length; i++) {
- carbonFiles[i] = new LocalCarbonFile(files[i]);
- }
-
- return carbonFiles;
- }
-
- @Override public String getName() {
- return file.getName();
- }
-
- @Override public boolean isDirectory() {
- return file.isDirectory();
- }
-
- @Override public boolean exists() {
- return file.exists();
- }
-
- @Override public String getCanonicalPath() {
- try {
- return file.getCanonicalPath();
- } catch (IOException e) {
- LOGGER
- .error(e, "Exception occured" + e.getMessage());
- }
- return null;
- }
-
- @Override public CarbonFile getParentFile() {
- return new LocalCarbonFile(file.getParentFile());
- }
-
- @Override public String getPath() {
- return file.getPath();
- }
-
- @Override public long getSize() {
- return file.length();
- }
-
- public boolean renameTo(String changetoName) {
- return file.renameTo(new File(changetoName));
- }
-
- public boolean delete() {
- return file.delete();
- }
-
- @Override public CarbonFile[] listFiles() {
-
- if (!file.isDirectory()) {
- return null;
- }
- File[] files = file.listFiles();
- if (files == null) {
- return new CarbonFile[0];
- }
- CarbonFile[] carbonFiles = new CarbonFile[files.length];
- for (int i = 0; i < carbonFiles.length; i++) {
- carbonFiles[i] = new LocalCarbonFile(files[i]);
- }
-
- return carbonFiles;
-
- }
-
- @Override public boolean createNewFile() {
- try {
- return file.createNewFile();
- } catch (IOException e) {
- return false;
- }
- }
-
- @Override public long getLastModifiedTime() {
- return file.lastModified();
- }
-
- @Override public boolean setLastModifiedTime(long timestamp) {
- return file.setLastModified(timestamp);
- }
-
- /**
- * This method will delete the data in file data from a given offset
- */
- @Override public boolean truncate(String fileName, long validDataEndOffset) {
- FileChannel source = null;
- FileChannel destination = null;
- boolean fileTruncatedSuccessfully = false;
- // temporary file name
- String tempWriteFilePath = fileName + CarbonCommonConstants.TEMPWRITEFILEEXTENSION;
- FileFactory.FileType fileType = FileFactory.getFileType(fileName);
- try {
- CarbonFile tempFile = null;
- // delete temporary file if it already exists at a given path
- if (FileFactory.isFileExist(tempWriteFilePath, fileType)) {
- tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
- tempFile.delete();
- }
- // create new temporary file
- FileFactory.createNewFile(tempWriteFilePath, fileType);
- tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
- source = new FileInputStream(fileName).getChannel();
- destination = new FileOutputStream(tempWriteFilePath).getChannel();
- long read = destination.transferFrom(source, 0, validDataEndOffset);
- long totalBytesRead = read;
- long remaining = validDataEndOffset - totalBytesRead;
- // read till required data offset is not reached
- while (remaining > 0) {
- read = destination.transferFrom(source, totalBytesRead, remaining);
- totalBytesRead = totalBytesRead + read;
- remaining = remaining - totalBytesRead;
- }
- CarbonUtil.closeStreams(source, destination);
- // rename the temp file to original file
- tempFile.renameForce(fileName);
- fileTruncatedSuccessfully = true;
- } catch (IOException e) {
- LOGGER.error("Exception occured while truncating the file " + e.getMessage());
- } finally {
- CarbonUtil.closeStreams(source, destination);
- }
- return fileTruncatedSuccessfully;
- }
-
- /**
- * This method will be used to check whether a file has been modified or not
- *
- * @param fileTimeStamp time to be compared with latest timestamp of file
- * @param endOffset file length to be compared with current length of file
- * @return
- */
- @Override public boolean isFileModified(long fileTimeStamp, long endOffset) {
- boolean isFileModified = false;
- if (getLastModifiedTime() > fileTimeStamp || getSize() > endOffset) {
- isFileModified = true;
- }
- return isFileModified;
- }
-
- @Override public boolean renameForce(String changetoName) {
- File destFile = new File(changetoName);
- if (destFile.exists()) {
- if (destFile.delete()) {
- return file.renameTo(new File(changetoName));
- }
- }
-
- return file.renameTo(new File(changetoName));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java b/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java
deleted file mode 100644
index 43c2ef9..0000000
--- a/core/src/main/java/org/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.core.datastorage.store.filesystem;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.datastorage.store.impl.FileFactory;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.viewfs.ViewFileSystem;
-
-public class ViewFSCarbonFile extends AbstractDFSCarbonFile {
- /**
- * LOGGER
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(ViewFSCarbonFile.class.getName());
-
- public ViewFSCarbonFile(String filePath) {
- super(filePath);
- }
-
- public ViewFSCarbonFile(Path path) {
- super(path);
- }
-
- public ViewFSCarbonFile(FileStatus fileStatus) {
- super(fileStatus);
- }
-
- /**
- * @param listStatus
- * @return
- */
- private CarbonFile[] getFiles(FileStatus[] listStatus) {
- if (listStatus == null) {
- return new CarbonFile[0];
- }
- CarbonFile[] files = new CarbonFile[listStatus.length];
- for (int i = 0; i < files.length; i++) {
- files[i] = new ViewFSCarbonFile(listStatus[i]);
- }
- return files;
- }
-
- @Override
- public CarbonFile[] listFiles() {
- FileStatus[] listStatus = null;
- try {
- if (null != fileStatus && fileStatus.isDirectory()) {
- Path path = fileStatus.getPath();
- listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path);
- } else {
- return null;
- }
- } catch (IOException ex) {
- LOGGER.error("Exception occured" + ex.getMessage());
- return new CarbonFile[0];
- }
- return getFiles(listStatus);
- }
-
- @Override
- public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
- CarbonFile[] files = listFiles();
- if (files != null && files.length >= 1) {
- List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length);
- for (int i = 0; i < files.length; i++) {
- if (fileFilter.accept(files[i])) {
- fileList.add(files[i]);
- }
- }
- if (fileList.size() >= 1) {
- return fileList.toArray(new CarbonFile[fileList.size()]);
- } else {
- return new CarbonFile[0];
- }
- }
- return files;
- }
-
- @Override public CarbonFile getParentFile() {
- Path parent = fileStatus.getPath().getParent();
- return null == parent ? null : new ViewFSCarbonFile(parent);
- }
-
- @Override
- public boolean renameForce(String changetoName) {
- FileSystem fs;
- try {
- fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
- if (fs instanceof ViewFileSystem) {
- fs.delete(new Path(changetoName), true);
- fs.rename(fileStatus.getPath(), new Path(changetoName));
- return true;
- } else {
- return false;
- }
- } catch (IOException e) {
- LOGGER.error("Exception occured" + e.getMessage());
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/datastorage/store/impl/CompressedDataMeasureDataWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/impl/CompressedDataMeasureDataWrapper.java b/core/src/main/java/org/carbondata/core/datastorage/store/impl/CompressedDataMeasureDataWrapper.java
deleted file mode 100644
index ed4ec11..0000000
--- a/core/src/main/java/org/carbondata/core/datastorage/store/impl/CompressedDataMeasureDataWrapper.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.datastorage.store.impl;
-
-import org.carbondata.core.datastorage.store.MeasureDataWrapper;
-import org.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
-
-public class CompressedDataMeasureDataWrapper implements MeasureDataWrapper {
-
- private final CarbonReadDataHolder[] values;
-
- public CompressedDataMeasureDataWrapper(final CarbonReadDataHolder[] values) {
- this.values = values;
- }
-
- @Override public CarbonReadDataHolder[] getValues() {
- return values;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java b/core/src/main/java/org/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java
deleted file mode 100644
index 2ffdb5a..0000000
--- a/core/src/main/java/org/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.core.datastorage.store.impl;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.datastorage.store.FileHolder;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-
-public class DFSFileHolderImpl implements FileHolder {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(DFSFileHolderImpl.class.getName());
- /**
- * cache to hold filename and its stream
- */
- private Map<String, FSDataInputStream> fileNameAndStreamCache;
-
- public DFSFileHolderImpl() {
- this.fileNameAndStreamCache =
- new HashMap<String, FSDataInputStream>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- }
-
- @Override public byte[] readByteArray(String filePath, long offset, int length) {
- FSDataInputStream fileChannel = updateCache(filePath);
- byte[] byteBffer = read(fileChannel, length, offset);
- return byteBffer;
- }
-
- /**
- * This method will be used to check whether stream is already present in
- * cache or not for filepath if not present then create it and then add to
- * cache, other wise get from cache
- *
- * @param filePath fully qualified file path
- * @return channel
- */
- private FSDataInputStream updateCache(String filePath) {
- FSDataInputStream fileChannel = fileNameAndStreamCache.get(filePath);
- try {
- if (null == fileChannel) {
- Path pt = new Path(filePath);
- FileSystem fs = FileSystem.get(FileFactory.getConfiguration());
- fileChannel = fs.open(pt);
- fileNameAndStreamCache.put(filePath, fileChannel);
- }
- } catch (IOException e) {
- LOGGER.error(e, e.getMessage());
- }
- return fileChannel;
- }
-
- /**
- * This method will be used to read from file based on number of bytes to be read and positon
- *
- * @param channel file channel
- * @param size number of bytes
- * @param offset position
- * @return byte buffer
- */
- private byte[] read(FSDataInputStream channel, int size, long offset) {
- byte[] byteBffer = new byte[size];
- try {
- channel.seek(offset);
- channel.readFully(byteBffer);
- } catch (Exception e) {
- LOGGER.error(e, e.getMessage());
- }
- return byteBffer;
- }
-
- /**
- * This method will be used to read from file based on number of bytes to be read and positon
- *
- * @param channel file channel
- * @param size number of bytes
- * @return byte buffer
- */
- private byte[] read(FSDataInputStream channel, int size) {
- byte[] byteBffer = new byte[size];
- try {
- channel.readFully(byteBffer);
- } catch (Exception e) {
- LOGGER.error(e, e.getMessage());
- }
- return byteBffer;
- }
-
- @Override public int readInt(String filePath, long offset) {
- FSDataInputStream fileChannel = updateCache(filePath);
- int i = -1;
- try {
- fileChannel.seek(offset);
- i = fileChannel.readInt();
- } catch (IOException e) {
- LOGGER.error(e, e.getMessage());
- }
-
- return i;
- }
-
- @Override public long readDouble(String filePath, long offset) {
- FSDataInputStream fileChannel = updateCache(filePath);
- long i = -1;
- try {
- fileChannel.seek(offset);
- i = fileChannel.readLong();
- } catch (IOException e) {
- LOGGER.error(e, e.getMessage());
- }
-
- return i;
- }
-
- @Override public void finish() {
- for (Entry<String, FSDataInputStream> entry : fileNameAndStreamCache.entrySet()) {
- try {
- FSDataInputStream channel = entry.getValue();
- if (null != channel) {
- channel.close();
- }
- } catch (IOException exception) {
- LOGGER.error(exception, exception.getMessage());
- }
- }
-
- }
-
- @Override public byte[] readByteArray(String filePath, int length) {
- FSDataInputStream fileChannel = updateCache(filePath);
- byte[] byteBffer = read(fileChannel, length);
- return byteBffer;
- }
-
- @Override public long readLong(String filePath, long offset) {
- FSDataInputStream fileChannel = updateCache(filePath);
- long i = -1;
- try {
- fileChannel.seek(offset);
- i = fileChannel.readLong();
- } catch (IOException e) {
- LOGGER.error(e, e.getMessage());
- }
- return i;
- }
-
- @Override public int readInt(String filePath) {
- FSDataInputStream fileChannel = updateCache(filePath);
- int i = -1;
- try {
- i = fileChannel.readInt();
- } catch (IOException e) {
- LOGGER.error(e, e.getMessage());
- }
- return i;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileFactory.java b/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileFactory.java
deleted file mode 100644
index 8005102..0000000
--- a/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileFactory.java
+++ /dev/null
@@ -1,477 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.datastorage.store.impl;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import org.carbondata.core.datastorage.store.FileHolder;
-import org.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import org.carbondata.core.datastorage.store.filesystem.HDFSCarbonFile;
-import org.carbondata.core.datastorage.store.filesystem.LocalCarbonFile;
-import org.carbondata.core.datastorage.store.filesystem.ViewFSCarbonFile;
-import org.carbondata.core.util.CarbonUtil;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public final class FileFactory {
- private static Configuration configuration = null;
-
- private static FileType storeDefaultFileType = FileType.LOCAL;
-
- static {
- String property = CarbonUtil.getCarbonStorePath(null, null);
- if (property != null) {
- if (property.startsWith(CarbonUtil.HDFS_PREFIX)) {
- storeDefaultFileType = FileType.HDFS;
- } else if (property.startsWith(CarbonUtil.VIEWFS_PREFIX)) {
- storeDefaultFileType = FileType.VIEWFS;
- }
- }
-
- configuration = new Configuration();
- configuration.addResource(new Path("../core-default.xml"));
- }
-
- private FileFactory() {
-
- }
-
- public static Configuration getConfiguration() {
- return configuration;
- }
-
- public static FileHolder getFileHolder(FileType fileType) {
- switch (fileType) {
- case LOCAL:
- return new FileHolderImpl();
- case HDFS:
- case VIEWFS:
- return new DFSFileHolderImpl();
- default:
- return new FileHolderImpl();
- }
- }
-
- public static FileType getFileType() {
- String property = CarbonUtil.getCarbonStorePath(null, null);
- if (property != null) {
- if (property.startsWith(CarbonUtil.HDFS_PREFIX)) {
- storeDefaultFileType = FileType.HDFS;
- } else if (property.startsWith(CarbonUtil.VIEWFS_PREFIX)) {
- storeDefaultFileType = FileType.VIEWFS;
- }
- }
- return storeDefaultFileType;
- }
-
- public static FileType getFileType(String path) {
- if (path.startsWith(CarbonUtil.HDFS_PREFIX)) {
- return FileType.HDFS;
- } else if (path.startsWith(CarbonUtil.VIEWFS_PREFIX)) {
- return FileType.VIEWFS;
- }
- return FileType.LOCAL;
- }
-
- public static CarbonFile getCarbonFile(String path, FileType fileType) {
- switch (fileType) {
- case LOCAL:
- return new LocalCarbonFile(path);
- case HDFS:
- return new HDFSCarbonFile(path);
- case VIEWFS:
- return new ViewFSCarbonFile(path);
- default:
- return new LocalCarbonFile(path);
- }
- }
-
- public static DataInputStream getDataInputStream(String path, FileType fileType)
- throws IOException {
- path = path.replace("\\", "/");
- switch (fileType) {
- case LOCAL:
- return new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
- case HDFS:
- case VIEWFS:
- Path pt = new Path(path);
- FileSystem fs = FileSystem.get(configuration);
- FSDataInputStream stream = fs.open(pt);
- return new DataInputStream(new BufferedInputStream(stream));
- default:
- return new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
- }
- }
-
- public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize)
- throws IOException {
- path = path.replace("\\", "/");
- switch (fileType) {
- case LOCAL:
- return new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
- case HDFS:
- case VIEWFS:
- Path pt = new Path(path);
- FileSystem fs = FileSystem.get(configuration);
- FSDataInputStream stream = fs.open(pt, bufferSize);
- return new DataInputStream(new BufferedInputStream(stream));
- default:
- return new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
- }
- }
-
- /**
- * return the datainputStream which is seek to the offset of file
- *
- * @param path
- * @param fileType
- * @param bufferSize
- * @param offset
- * @return DataInputStream
- * @throws IOException
- */
- public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize,
- long offset) throws IOException {
- path = path.replace("\\", "/");
- switch (fileType) {
- case HDFS:
- case VIEWFS:
- Path pt = new Path(path);
- FileSystem fs = FileSystem.get(configuration);
- FSDataInputStream stream = fs.open(pt, bufferSize);
- stream.seek(offset);
- return new DataInputStream(new BufferedInputStream(stream));
- default:
- FileInputStream fis = new FileInputStream(path);
- long actualSkipSize = 0;
- long skipSize = offset;
- while (actualSkipSize != offset) {
- actualSkipSize += fis.skip(skipSize);
- skipSize = skipSize - actualSkipSize;
- }
- return new DataInputStream(new BufferedInputStream(fis));
- }
- }
-
- public static DataOutputStream getDataOutputStream(String path, FileType fileType)
- throws IOException {
- path = path.replace("\\", "/");
- switch (fileType) {
- case LOCAL:
- return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
- case HDFS:
- case VIEWFS:
- Path pt = new Path(path);
- FileSystem fs = pt.getFileSystem(configuration);
- FSDataOutputStream stream = fs.create(pt, true);
- return stream;
- default:
- return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
- }
- }
-
- public static DataOutputStream getDataOutputStream(String path, FileType fileType,
- short replicationFactor) throws IOException {
- path = path.replace("\\", "/");
- switch (fileType) {
- case LOCAL:
- return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
- case HDFS:
- case VIEWFS:
- Path pt = new Path(path);
- FileSystem fs = pt.getFileSystem(configuration);
- FSDataOutputStream stream = fs.create(pt, replicationFactor);
- return stream;
- default:
- return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
- }
- }
-
- public static DataOutputStream getDataOutputStream(String path, FileType fileType, int bufferSize)
- throws IOException {
- path = path.replace("\\", "/");
- switch (fileType) {
- case LOCAL:
- return new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(path), bufferSize));
- case HDFS:
- case VIEWFS:
- Path pt = new Path(path);
- FileSystem fs = pt.getFileSystem(configuration);
- FSDataOutputStream stream = fs.create(pt, true, bufferSize);
- return stream;
- default:
- return new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(path), bufferSize));
- }
- }
-
- public static DataOutputStream getDataOutputStream(String path, FileType fileType, int bufferSize,
- boolean append) throws IOException {
- path = path.replace("\\", "/");
- switch (fileType) {
- case LOCAL:
- return new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(path, append), bufferSize));
- case HDFS:
- case VIEWFS:
- Path pt = new Path(path);
- FileSystem fs = pt.getFileSystem(configuration);
- FSDataOutputStream stream = null;
- if (append) {
- // append to a file only if file already exists else file not found
- // exception will be thrown by hdfs
- if (CarbonUtil.isFileExists(path)) {
- stream = fs.append(pt, bufferSize);
- } else {
- stream = fs.create(pt, true, bufferSize);
- }
- } else {
- stream = fs.create(pt, true, bufferSize);
- }
- return stream;
- default:
- return new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(path), bufferSize));
- }
- }
-
- public static DataOutputStream getDataOutputStream(String path, FileType fileType, int bufferSize,
- long blockSize) throws IOException {
- path = path.replace("\\", "/");
- switch (fileType) {
- case LOCAL:
- return new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(path), bufferSize));
- case HDFS:
- case VIEWFS:
- Path pt = new Path(path);
- FileSystem fs = pt.getFileSystem(configuration);
- FSDataOutputStream stream =
- fs.create(pt, true, bufferSize, fs.getDefaultReplication(pt), blockSize);
- return stream;
- default:
- return new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(path), bufferSize));
- }
- }
-
- /**
- * This method checks the given path exists or not and also is it file or
- * not if the performFileCheck is true
- *
- * @param filePath - Path
- * @param fileType - FileType Local/HDFS
- * @param performFileCheck - Provide false for folders, true for files and
- */
- public static boolean isFileExist(String filePath, FileType fileType, boolean performFileCheck)
- throws IOException {
- filePath = filePath.replace("\\", "/");
- switch (fileType) {
- case HDFS:
- case VIEWFS:
- Path path = new Path(filePath);
- FileSystem fs = path.getFileSystem(configuration);
- if (performFileCheck) {
- return fs.exists(path) && fs.isFile(path);
- } else {
- return fs.exists(path);
- }
-
- case LOCAL:
- default:
- File defaultFile = new File(filePath);
-
- if (performFileCheck) {
- return defaultFile.exists() && defaultFile.isFile();
- } else {
- return defaultFile.exists();
- }
- }
- }
-
- /**
- * This method checks the given path exists or not and also is it file or
- * not if the performFileCheck is true
- *
- * @param filePath - Path
- * @param fileType - FileType Local/HDFS
- */
- public static boolean isFileExist(String filePath, FileType fileType) throws IOException {
- filePath = filePath.replace("\\", "/");
- switch (fileType) {
- case HDFS:
- case VIEWFS:
- Path path = new Path(filePath);
- FileSystem fs = path.getFileSystem(configuration);
- return fs.exists(path);
-
- case LOCAL:
- default:
- File defaultFile = new File(filePath);
- return defaultFile.exists();
- }
- }
-
- public static boolean createNewFile(String filePath, FileType fileType) throws IOException {
- filePath = filePath.replace("\\", "/");
- switch (fileType) {
- case HDFS:
- case VIEWFS:
- Path path = new Path(filePath);
- FileSystem fs = path.getFileSystem(configuration);
- return fs.createNewFile(path);
-
- case LOCAL:
- default:
- File file = new File(filePath);
- return file.createNewFile();
- }
- }
-
- public static boolean mkdirs(String filePath, FileType fileType) throws IOException {
- filePath = filePath.replace("\\", "/");
- switch (fileType) {
- case HDFS:
- case VIEWFS:
- Path path = new Path(filePath);
- FileSystem fs = path.getFileSystem(configuration);
- return fs.mkdirs(path);
- case LOCAL:
- default:
- File file = new File(filePath);
- return file.mkdirs();
- }
- }
-
- /**
- * for getting the dataoutput stream using the hdfs filesystem append API.
- *
- * @param path
- * @param fileType
- * @return
- * @throws IOException
- */
- public static DataOutputStream getDataOutputStreamUsingAppend(String path, FileType fileType)
- throws IOException {
- path = path.replace("\\", "/");
- switch (fileType) {
- case LOCAL:
- return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path, true)));
- case HDFS:
- case VIEWFS:
- Path pt = new Path(path);
- FileSystem fs = pt.getFileSystem(configuration);
- FSDataOutputStream stream = fs.append(pt);
- return stream;
- default:
- return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
- }
- }
-
- /**
- * for creating a new Lock file and if it is successfully created
- * then in case of abrupt shutdown then the stream to that file will be closed.
- *
- * @param filePath
- * @param fileType
- * @return
- * @throws IOException
- */
- public static boolean createNewLockFile(String filePath, FileType fileType) throws IOException {
- filePath = filePath.replace("\\", "/");
- switch (fileType) {
- case HDFS:
- case VIEWFS:
- Path path = new Path(filePath);
- FileSystem fs = path.getFileSystem(configuration);
- if (fs.createNewFile(path)) {
- fs.deleteOnExit(path);
- return true;
- }
- return false;
- case LOCAL:
- default:
- File file = new File(filePath);
- return file.createNewFile();
- }
- }
-
- public enum FileType {
- LOCAL, HDFS, VIEWFS
- }
-
- /**
- * below method will be used to update the file path
- * for local type
- * it removes the file:/ from the path
- *
- * @param filePath
- * @return updated file path without url for local
- */
- public static String getUpdatedFilePath(String filePath) {
- FileType fileType = getFileType(filePath);
- switch (fileType) {
- case HDFS:
- case VIEWFS:
- return filePath;
- case LOCAL:
- default:
- Path pathWithoutSchemeAndAuthority =
- Path.getPathWithoutSchemeAndAuthority(new Path(filePath));
- return pathWithoutSchemeAndAuthority.toString();
- }
- }
-
- /**
- * It computes size of directory
- *
- * @param filePath
- * @return size in bytes
- * @throws IOException
- */
- public static long getDirectorySize(String filePath) throws IOException {
- FileType fileType = getFileType(filePath);
- switch (fileType) {
- case HDFS:
- case VIEWFS:
- Path path = new Path(filePath);
- FileSystem fs = path.getFileSystem(configuration);
- return fs.getContentSummary(path).getLength();
- case LOCAL:
- default:
- File file = new File(filePath);
- return FileUtils.sizeOfDirectory(file);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileHolderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileHolderImpl.java b/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileHolderImpl.java
deleted file mode 100644
index 1801498..0000000
--- a/core/src/main/java/org/carbondata/core/datastorage/store/impl/FileHolderImpl.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.datastorage.store.impl;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.datastorage.store.FileHolder;
-
-public class FileHolderImpl implements FileHolder {
- /**
- * Attribute for Carbon LOGGER
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(FileHolderImpl.class.getName());
- /**
- * cache to hold filename and its stream
- */
- private Map<String, FileChannel> fileNameAndStreamCache;
-
- /**
- * FileHolderImpl Constructor
- * It will create the cache
- */
- public FileHolderImpl() {
- this.fileNameAndStreamCache =
- new HashMap<String, FileChannel>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- }
-
- public FileHolderImpl(int capacity) {
- this.fileNameAndStreamCache = new HashMap<String, FileChannel>(capacity);
- }
-
- /**
- * This method will be used to read the byte array from file based on offset
- * and length(number of bytes) need to read
- *
- * @param filePath fully qualified file path
- * @param offset reading start position,
- * @param length number of bytes to be read
- * @return read byte array
- */
- @Override public byte[] readByteArray(String filePath, long offset, int length) {
- FileChannel fileChannel = updateCache(filePath);
- ByteBuffer byteBffer = read(fileChannel, length, offset);
- return byteBffer.array();
- }
-
- /**
- * This method will be used to close all the streams currently present in the cache
- */
- @Override public void finish() {
-
- for (Entry<String, FileChannel> entry : fileNameAndStreamCache.entrySet()) {
- try {
- FileChannel channel = entry.getValue();
- if (null != channel) {
- channel.close();
- }
- } catch (IOException exception) {
- LOGGER.error(exception, exception.getMessage());
- }
- }
-
- }
-
- /**
- * This method will be used to read int from file from postion(offset), here
- * length will be always 4 bacause int byte size if 4
- *
- * @param filePath fully qualified file path
- * @param offset reading start position,
- * @return read int
- */
- @Override public int readInt(String filePath, long offset) {
- FileChannel fileChannel = updateCache(filePath);
- ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.INT_SIZE_IN_BYTE, offset);
- return byteBffer.getInt();
- }
-
- /**
- * This method will be used to read int from file from postion(offset), here
- * length will be always 4 bacause int byte size if 4
- *
- * @param filePath fully qualified file path
- * @return read int
- */
- @Override public int readInt(String filePath) {
- FileChannel fileChannel = updateCache(filePath);
- ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.INT_SIZE_IN_BYTE);
- return byteBffer.getInt();
- }
-
- /**
- * This method will be used to read int from file from postion(offset), here
- * length will be always 4 bacause int byte size if 4
- *
- * @param filePath fully qualified file path
- * @param offset reading start position,
- * @return read int
- */
- @Override public long readDouble(String filePath, long offset) {
- FileChannel fileChannel = updateCache(filePath);
- ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.LONG_SIZE_IN_BYTE, offset);
- return byteBffer.getLong();
- }
-
- /**
- * This method will be used to check whether stream is already present in
- * cache or not for filepath if not present then create it and then add to
- * cache, other wise get from cache
- *
- * @param filePath fully qualified file path
- * @return channel
- */
- private FileChannel updateCache(String filePath) {
- FileChannel fileChannel = fileNameAndStreamCache.get(filePath);
- try {
- if (null == fileChannel) {
- FileInputStream stream = new FileInputStream(filePath);
- fileChannel = stream.getChannel();
- fileNameAndStreamCache.put(filePath, fileChannel);
- }
- } catch (IOException e) {
- LOGGER.error(e, e.getMessage());
- }
- return fileChannel;
- }
-
- /**
- * This method will be used to read from file based on number of bytes to be read and positon
- *
- * @param channel file channel
- * @param size number of bytes
- * @param offset position
- * @return byte buffer
- */
- private ByteBuffer read(FileChannel channel, int size, long offset) {
- ByteBuffer byteBffer = ByteBuffer.allocate(size);
- try {
- channel.position(offset);
- channel.read(byteBffer);
- } catch (Exception e) {
- LOGGER.error(e, e.getMessage());
- }
- byteBffer.rewind();
- return byteBffer;
- }
-
- /**
- * This method will be used to read from file based on number of bytes to be read and positon
- *
- * @param channel file channel
- * @param size number of bytes
- * @return byte buffer
- */
- private ByteBuffer read(FileChannel channel, int size) {
- ByteBuffer byteBffer = ByteBuffer.allocate(size);
- try {
- channel.read(byteBffer);
- } catch (Exception e) {
- LOGGER.error(e, e.getMessage());
- }
- byteBffer.rewind();
- return byteBffer;
- }
-
-
- /**
- * This method will be used to read the byte array from file based on length(number of bytes)
- *
- * @param filePath fully qualified file path
- * @param length number of bytes to be read
- * @return read byte array
- */
- @Override public byte[] readByteArray(String filePath, int length) {
- FileChannel fileChannel = updateCache(filePath);
- ByteBuffer byteBffer = read(fileChannel, length);
- return byteBffer.array();
- }
-
- /**
- * This method will be used to read long from file from postion(offset), here
- * length will be always 8 bacause int byte size is 8
- *
- * @param filePath fully qualified file path
- * @param offset reading start position,
- * @return read long
- */
- @Override public long readLong(String filePath, long offset) {
- FileChannel fileChannel = updateCache(filePath);
- ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.LONG_SIZE_IN_BYTE, offset);
- return byteBffer.getLong();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/datastorage/store/impl/MemoryMappedFileHolderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/impl/MemoryMappedFileHolderImpl.java b/core/src/main/java/org/carbondata/core/datastorage/store/impl/MemoryMappedFileHolderImpl.java
deleted file mode 100644
index bb3e7a8..0000000
--- a/core/src/main/java/org/carbondata/core/datastorage/store/impl/MemoryMappedFileHolderImpl.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.core.datastorage.store.impl;
-
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.datastorage.store.FileHolder;
-
-public class MemoryMappedFileHolderImpl implements FileHolder {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(MemoryMappedFileHolderImpl.class.getName());
-
- private Map<String, FileChannel> fileNameAndStreamCache;
- private Map<String, MappedByteBuffer> fileNameAndMemoryMappedFileCache;
-
- public MemoryMappedFileHolderImpl() {
- this(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- }
-
- public MemoryMappedFileHolderImpl(int capacity) {
- this.fileNameAndStreamCache = new HashMap<String, FileChannel>(capacity);
- this.fileNameAndMemoryMappedFileCache = new HashMap<String, MappedByteBuffer>(capacity);
- }
-
- private MappedByteBuffer updateCache(String filePath) {
- MappedByteBuffer byteBuffer = fileNameAndMemoryMappedFileCache.get(filePath);
- try {
- if (null == byteBuffer) {
- FileChannel fileChannel = new RandomAccessFile(filePath, "r").getChannel();
- byteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());
- fileNameAndStreamCache.put(filePath, fileChannel);
- fileNameAndMemoryMappedFileCache.put(filePath, byteBuffer);
- }
- } catch (IOException e) {
- LOGGER.error(e, e.getMessage());
- }
- return byteBuffer;
- }
-
- @Override
- public byte[] readByteArray(String filePath, long offset, int length) {
- byte[] dst = new byte[length];
- updateCache(filePath).get(dst, (int)offset, length);
- return dst;
- }
-
- @Override
- public byte[] readByteArray(String filePath, int length) {
- byte[] dst = new byte[length];
- updateCache(filePath).get(dst);
- return dst;
- }
-
- @Override
- public int readInt(String filePath, long offset) {
- byte[] dst = readByteArray(filePath, offset, CarbonCommonConstants.INT_SIZE_IN_BYTE);
- return ByteBuffer.wrap(dst).getInt();
- }
-
- @Override
- public long readLong(String filePath, long offset) {
- byte[] dst = readByteArray(filePath, offset, CarbonCommonConstants.LONG_SIZE_IN_BYTE);
- return ByteBuffer.wrap(dst).getLong();
- }
-
- @Override
- public int readInt(String filePath) {
- return updateCache(filePath).getInt();
- }
-
- @Override
- public long readDouble(String filePath, long offset) {
- byte[] dst = readByteArray(filePath, offset, CarbonCommonConstants.LONG_SIZE_IN_BYTE);
- return ByteBuffer.wrap(dst).getLong();
- }
-
- @Override
- public void finish() {
- fileNameAndMemoryMappedFileCache.clear();
- for (Entry<String, FileChannel> entry : fileNameAndStreamCache.entrySet()) {
- try {
- FileChannel channel = entry.getValue();
- if (null != channel) {
- channel.close();
- }
- } catch (IOException exception) {
- LOGGER.error(exception, exception.getMessage());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/datastorage/store/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java b/core/src/main/java/org/carbondata/core/datastorage/store/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
deleted file mode 100644
index f2fc0dd..0000000
--- a/core/src/main/java/org/carbondata/core/datastorage/store/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.datastorage.store.impl.data.compressed;
-
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.datastorage.store.NodeMeasureDataStore;
-import org.carbondata.core.datastorage.store.compression.ValueCompressionModel;
-import org.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import org.carbondata.core.datastorage.store.dataholder.CarbonWriteDataHolder;
-import org.carbondata.core.util.ValueCompressionUtil;
-
-public abstract class AbstractHeavyCompressedDoubleArrayDataStore
- implements NodeMeasureDataStore //NodeMeasureDataStore<double[]>
-{
-
- /**
- * values.
- */
- protected ValueCompressonHolder.UnCompressValue[] values;
-
- /**
- * compressionModel.
- */
- protected ValueCompressionModel compressionModel;
-
- /**
- * type
- */
- private char[] type;
-
- /**
- * AbstractHeavyCompressedDoubleArrayDataStore constructor.
- *
- * @param compressionModel
- */
- public AbstractHeavyCompressedDoubleArrayDataStore(ValueCompressionModel compressionModel) {
- this.compressionModel = compressionModel;
- if (null != compressionModel) {
- this.type = compressionModel.getType();
- values =
- new ValueCompressonHolder.UnCompressValue[compressionModel.getUnCompressValues().length];
- }
- }
-
- @Override public byte[][] getWritableMeasureDataArray(CarbonWriteDataHolder[] dataHolder) {
- for (int i = 0; i < compressionModel.getUnCompressValues().length; i++) {
- values[i] = compressionModel.getUnCompressValues()[i].getNew();
- if (type[i] != CarbonCommonConstants.BYTE_VALUE_MEASURE
- && type[i] != CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
- values[i].setValue(ValueCompressionUtil
- .getCompressedValues(compressionModel.getCompType()[i],
- dataHolder[i].getWritableLongValues(), compressionModel.getChangedDataType()[i],
- (long) compressionModel.getMaxValue()[i], compressionModel.getDecimal()[i]));
- } else {
- values[i].setValue(ValueCompressionUtil
- .getCompressedValues(compressionModel.getCompType()[i],
- dataHolder[i].getWritableDoubleValues(), compressionModel.getChangedDataType()[i],
- (double) compressionModel.getMaxValue()[i], compressionModel.getDecimal()[i]));
- }
- } else {
- values[i].setValue(dataHolder[i].getWritableByteArrayValues());
- }
- values[i] = values[i].compress();
- }
- byte[][] returnValue = new byte[values.length][];
- for (int i = 0; i < values.length; i++) {
- returnValue[i] = values[i].getBackArrayData();
- }
- return returnValue;
- }
-
- @Override public short getLength() {
- return values != null ? (short) values.length : 0;
- }
-
-}