You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/01/28 13:35:46 UTC
[09/18] TAJO-520: Move tajo-core-storage to tajo-storage. (jinho)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
new file mode 100644
index 0000000..91a535e
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
@@ -0,0 +1,690 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.net.util.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.util.Bytes;
+import org.apache.tajo.util.FileUtil;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+public abstract class AbstractStorageManager {
+ private final Log LOG = LogFactory.getLog(AbstractStorageManager.class);
+
+ protected final TajoConf conf;
+ protected final FileSystem fs;
+ protected final Path tableBaseDir;
+ protected final boolean blocksMetadataEnabled;
+
+ /**
+ * Cache of scanner handlers for each storage type.
+ */
+ protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE
+ = new ConcurrentHashMap<String, Class<? extends Scanner>>();
+
+ /**
+ * Cache of appender handlers for each storage type.
+ */
+ protected static final Map<String, Class<? extends FileAppender>> APPENDER_HANDLER_CACHE
+ = new ConcurrentHashMap<String, Class<? extends FileAppender>>();
+
+ /**
+ * Cache of constructors for each class. Pins the classes so they
+ * can't be garbage collected until ReflectionUtils can be collected.
+ */
+ private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
+ new ConcurrentHashMap<Class<?>, Constructor<?>>();
+
+ public abstract Class<? extends Scanner> getScannerClass(CatalogProtos.StoreType storeType) throws IOException;
+
+ public abstract Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException;
+
+ protected AbstractStorageManager(TajoConf conf) throws IOException {
+ this.conf = conf;
+ this.tableBaseDir = TajoConf.getWarehouseDir(conf);
+ this.fs = tableBaseDir.getFileSystem(conf);
+ this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
+ DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+ if (!this.blocksMetadataEnabled)
+ LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
+ }
+
+ public Scanner getFileScanner(TableMeta meta, Schema schema, Path path)
+ throws IOException {
+ FileSystem fs = path.getFileSystem(conf);
+ FileStatus status = fs.getFileStatus(path);
+ FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
+ return getScanner(meta, schema, fragment);
+ }
+
+ public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment) throws IOException {
+ return getScanner(meta, schema, FragmentConvertor.convert(conf, meta.getStoreType(), fragment), schema);
+ }
+
+ public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException {
+ return getScanner(meta, schema, FragmentConvertor.convert(conf, meta.getStoreType(), fragment), target);
+ }
+
+ public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException {
+ return getScanner(meta, schema, fragment, schema);
+ }
+
+ public FileSystem getFileSystem() {
+ return this.fs;
+ }
+
+ public Path getWarehouseDir() {
+ return this.tableBaseDir;
+ }
+
+ public void delete(Path tablePath) throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+ fs.delete(tablePath, true);
+ }
+
+ public boolean exists(Path path) throws IOException {
+ FileSystem fileSystem = path.getFileSystem(conf);
+ return fileSystem.exists(path);
+ }
+
+ /**
+ * This method deletes only data contained in the given path.
+ *
+ * @param path The path in which data are deleted.
+ * @throws IOException
+ */
+ public void deleteData(Path path) throws IOException {
+ FileSystem fileSystem = path.getFileSystem(conf);
+ FileStatus[] fileLists = fileSystem.listStatus(path);
+ for (FileStatus status : fileLists) {
+ fileSystem.delete(status.getPath(), true);
+ }
+ }
+
+ public Path getTablePath(String tableName) {
+ return new Path(tableBaseDir, tableName);
+ }
+
+ public Appender getAppender(TableMeta meta, Schema schema, Path path)
+ throws IOException {
+ Appender appender;
+
+ Class<? extends FileAppender> appenderClass;
+
+ String handlerName = meta.getStoreType().name().toLowerCase();
+ appenderClass = APPENDER_HANDLER_CACHE.get(handlerName);
+ if (appenderClass == null) {
+ appenderClass = conf.getClass(
+ String.format("tajo.storage.appender-handler.%s.class",
+ meta.getStoreType().name().toLowerCase()), null,
+ FileAppender.class);
+ APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
+ }
+
+ if (appenderClass == null) {
+ throw new IOException("Unknown Storage Type: " + meta.getStoreType());
+ }
+
+ appender = newAppenderInstance(appenderClass, conf, meta, schema, path);
+
+ return appender;
+ }
+
+
+ public TableMeta getTableMeta(Path tablePath) throws IOException {
+ TableMeta meta;
+
+ FileSystem fs = tablePath.getFileSystem(conf);
+ Path tableMetaPath = new Path(tablePath, ".meta");
+ if (!fs.exists(tableMetaPath)) {
+ throw new FileNotFoundException(".meta file not found in " + tablePath.toString());
+ }
+
+ FSDataInputStream tableMetaIn = fs.open(tableMetaPath);
+
+ CatalogProtos.TableProto tableProto = (CatalogProtos.TableProto) FileUtil.loadProto(tableMetaIn,
+ CatalogProtos.TableProto.getDefaultInstance());
+ meta = new TableMeta(tableProto);
+
+ return meta;
+ }
+
+ public FileFragment[] split(String tableName) throws IOException {
+ Path tablePath = new Path(tableBaseDir, tableName);
+ return split(tableName, tablePath, fs.getDefaultBlockSize());
+ }
+
+ public FileFragment[] split(String tableName, long fragmentSize) throws IOException {
+ Path tablePath = new Path(tableBaseDir, tableName);
+ return split(tableName, tablePath, fragmentSize);
+ }
+
+ public FileFragment[] splitBroadcastTable(Path tablePath) throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+ List<FileFragment> listTablets = new ArrayList<FileFragment>();
+ FileFragment tablet;
+
+ FileStatus[] fileLists = fs.listStatus(tablePath);
+ for (FileStatus file : fileLists) {
+ tablet = new FileFragment(tablePath.getName(), file.getPath(), 0, file.getLen());
+ listTablets.add(tablet);
+ }
+
+ FileFragment[] tablets = new FileFragment[listTablets.size()];
+ listTablets.toArray(tablets);
+
+ return tablets;
+ }
+
+ public FileFragment[] split(Path tablePath) throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+ return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
+ }
+
+ public FileFragment[] split(String tableName, Path tablePath) throws IOException {
+ return split(tableName, tablePath, fs.getDefaultBlockSize());
+ }
+
+ private FileFragment[] split(String tableName, Path tablePath, long size)
+ throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+
+ long defaultBlockSize = size;
+ List<FileFragment> listTablets = new ArrayList<FileFragment>();
+ FileFragment tablet;
+
+ FileStatus[] fileLists = fs.listStatus(tablePath);
+ for (FileStatus file : fileLists) {
+ long remainFileSize = file.getLen();
+ long start = 0;
+ if (remainFileSize > defaultBlockSize) {
+ while (remainFileSize > defaultBlockSize) {
+ tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
+ listTablets.add(tablet);
+ start += defaultBlockSize;
+ remainFileSize -= defaultBlockSize;
+ }
+ listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
+ } else {
+ listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
+ }
+ }
+
+ FileFragment[] tablets = new FileFragment[listTablets.size()];
+ listTablets.toArray(tablets);
+
+ return tablets;
+ }
+
+ public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
+ Path tablePath, long size)
+ throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+
+ long defaultBlockSize = size;
+ List<FileFragment> listTablets = new ArrayList<FileFragment>();
+ FileFragment tablet;
+
+ FileStatus[] fileLists = fs.listStatus(tablePath);
+ for (FileStatus file : fileLists) {
+ long remainFileSize = file.getLen();
+ long start = 0;
+ if (remainFileSize > defaultBlockSize) {
+ while (remainFileSize > defaultBlockSize) {
+ tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
+ listTablets.add(tablet);
+ start += defaultBlockSize;
+ remainFileSize -= defaultBlockSize;
+ }
+ listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
+ } else {
+ listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
+ }
+ }
+
+ FileFragment[] tablets = new FileFragment[listTablets.size()];
+ listTablets.toArray(tablets);
+
+ return tablets;
+ }
+
+ public long calculateSize(Path tablePath) throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+ long totalSize = 0;
+
+ if (fs.exists(tablePath)) {
+ totalSize = fs.getContentSummary(tablePath).getLength();
+ }
+
+ return totalSize;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+ // FileInputFormat Area
+ /////////////////////////////////////////////////////////////////////////////
+
+ private static final PathFilter hiddenFileFilter = new PathFilter() {
+ public boolean accept(Path p) {
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
+
+ /**
+ * Proxy PathFilter that accepts a path only if all filters given in the
+ * constructor do. Used by the listPaths() to apply the built-in
+ * hiddenFileFilter together with a user provided one (if any).
+ */
+ private static class MultiPathFilter implements PathFilter {
+ private List<PathFilter> filters;
+
+ public MultiPathFilter(List<PathFilter> filters) {
+ this.filters = filters;
+ }
+
+ public boolean accept(Path path) {
+ for (PathFilter filter : filters) {
+ if (!filter.accept(path)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ /**
+ * List input directories.
+ * Subclasses may override to, e.g., select only files matching a regular
+ * expression.
+ *
+ * @return array of FileStatus objects
+ * @throws IOException if zero items.
+ */
+ protected List<FileStatus> listStatus(Path path) throws IOException {
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ Path[] dirs = new Path[]{path};
+ if (dirs.length == 0) {
+ throw new IOException("No input paths specified in job");
+ }
+
+ List<IOException> errors = new ArrayList<IOException>();
+
+ // creates a MultiPathFilter with the hiddenFileFilter and the
+ // user provided one (if any).
+ List<PathFilter> filters = new ArrayList<PathFilter>();
+ filters.add(hiddenFileFilter);
+
+ PathFilter inputFilter = new MultiPathFilter(filters);
+
+ for (int i = 0; i < dirs.length; ++i) {
+ Path p = dirs[i];
+
+ FileSystem fs = p.getFileSystem(conf);
+ FileStatus[] matches = fs.globStatus(p, inputFilter);
+ if (matches == null) {
+ errors.add(new IOException("Input path does not exist: " + p));
+ } else if (matches.length == 0) {
+ errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
+ } else {
+ for (FileStatus globStat : matches) {
+ if (globStat.isDirectory()) {
+ for (FileStatus stat : fs.listStatus(globStat.getPath(),
+ inputFilter)) {
+ result.add(stat);
+ }
+ } else {
+ result.add(globStat);
+ }
+ }
+ }
+ }
+
+ if (!errors.isEmpty()) {
+ throw new InvalidInputException(errors);
+ }
+ LOG.info("Total input paths to process : " + result.size());
+ return result;
+ }
+
+ /**
+ * Get the lower bound on split size imposed by the format.
+ *
+ * @return the number of bytes of the minimal split for this format
+ */
+ protected long getFormatMinSplitSize() {
+ return 1;
+ }
+
+ /**
+ * Is the given filename splitable? Usually, true, but if the file is
+ * stream compressed, it will not be.
+ * <p/>
+ * <code>FileInputFormat</code> implementations can override this and return
+ * <code>false</code> to ensure that individual input files are never split-up
+ * so that Mappers process entire files.
+ *
+ *
+ * @param filename the file name to check
+ * @return is this file isSplittable?
+ */
+ protected boolean isSplittable(TableMeta meta, Schema schema, Path filename) throws IOException {
+ Scanner scanner = getFileScanner(meta, schema, filename);
+ return scanner.isSplittable();
+ }
+
+ @Deprecated
+ protected long computeSplitSize(long blockSize, long minSize,
+ long maxSize) {
+ return Math.max(minSize, Math.min(maxSize, blockSize));
+ }
+
+ @Deprecated
+ private static final double SPLIT_SLOP = 1.1; // 10% slop
+
+ @Deprecated
+ protected int getBlockIndex(BlockLocation[] blkLocations,
+ long offset) {
+ for (int i = 0; i < blkLocations.length; i++) {
+ // is the offset inside this block?
+ if ((blkLocations[i].getOffset() <= offset) &&
+ (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
+ return i;
+ }
+ }
+ BlockLocation last = blkLocations[blkLocations.length - 1];
+ long fileLength = last.getOffset() + last.getLength() - 1;
+ throw new IllegalArgumentException("Offset " + offset +
+ " is outside of file (0.." +
+ fileLength + ")");
+ }
+
+ /**
+ * A factory that makes the split for this class. It can be overridden
+ * by sub-classes to make sub-types
+ */
+ protected FileFragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length) {
+ return new FileFragment(fragmentId, file, start, length);
+ }
+
+ protected FileFragment makeSplit(String fragmentId, TableMeta meta, Path file, BlockLocation blockLocation,
+ int[] diskIds) throws IOException {
+ return new FileFragment(fragmentId, file, blockLocation, diskIds);
+ }
+
+ // for Non Splittable. eg, compressed gzip TextFile
+ protected FileFragment makeNonSplit(String fragmentId, TableMeta meta, Path file, long start, long length,
+ BlockLocation[] blkLocations) throws IOException {
+
+ Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
+ for (BlockLocation blockLocation : blkLocations) {
+ for (String host : blockLocation.getHosts()) {
+ if (hostsBlockMap.containsKey(host)) {
+ hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
+ } else {
+ hostsBlockMap.put(host, 1);
+ }
+ }
+ }
+
+ List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet());
+ Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
+
+ @Override
+ public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) {
+ return v1.getValue().compareTo(v2.getValue());
+ }
+ });
+
+ String[] hosts = new String[blkLocations[0].getHosts().length];
+
+ for (int i = 0; i < hosts.length; i++) {
+ Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
+ hosts[i] = entry.getKey();
+ }
+ return new FileFragment(fragmentId, file, start, length, hosts);
+ }
+
+ /**
+ * Get the maximum split size.
+ *
+ * @return the maximum number of bytes a split can include
+ */
+ @Deprecated
+ public static long getMaxSplitSize() {
+ // TODO - to be configurable
+ return 536870912L;
+ }
+
+ /**
+ * Get the minimum split size
+ *
+ * @return the minimum number of bytes that can be in a split
+ */
+ @Deprecated
+ public static long getMinSplitSize() {
+ // TODO - to be configurable
+ return 67108864L;
+ }
+
+ /**
+ * Get Disk Ids by Volume Bytes
+ */
+ private int[] getDiskIds(VolumeId[] volumeIds) {
+ int[] diskIds = new int[volumeIds.length];
+ for (int i = 0; i < volumeIds.length; i++) {
+ int diskId = -1;
+ if (volumeIds[i] != null && volumeIds[i].isValid()) {
+ String volumeIdString = volumeIds[i].toString();
+ byte[] volumeIdBytes = Base64.decodeBase64(volumeIdString);
+
+ if (volumeIdBytes.length == 4) {
+ diskId = Bytes.toInt(volumeIdBytes);
+ } else if (volumeIdBytes.length == 1) {
+ diskId = (int) volumeIdBytes[0]; // support hadoop-2.0.2
+ }
+ }
+ diskIds[i] = diskId;
+ }
+ return diskIds;
+ }
+
+ /**
+ * Generate the map of host and make them into Volume Ids.
+ *
+ */
+ private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) {
+ Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
+ for (FileFragment frag : frags) {
+ String[] hosts = frag.getHosts();
+ int[] diskIds = frag.getDiskIds();
+ for (int i = 0; i < hosts.length; i++) {
+ Set<Integer> volumeList = volumeMap.get(hosts[i]);
+ if (volumeList == null) {
+ volumeList = new HashSet<Integer>();
+ volumeMap.put(hosts[i], volumeList);
+ }
+
+ if (diskIds.length > 0 && diskIds[i] > -1) {
+ volumeList.add(diskIds[i]);
+ }
+ }
+ }
+
+ return volumeMap;
+ }
+ /**
+ * Generate the list of files and make them into FileSplits.
+ *
+ * @throws IOException
+ */
+ public List<FileFragment> getSplits(String tableName, TableMeta meta, Schema schema, Path inputPath) throws IOException {
+ // generate splits'
+
+ List<FileFragment> splits = new ArrayList<FileFragment>();
+ FileSystem fs = inputPath.getFileSystem(conf);
+ List<FileStatus> files;
+ if (fs.isFile(inputPath)) {
+ files = Lists.newArrayList(fs.getFileStatus(inputPath));
+ } else {
+ files = listStatus(inputPath);
+ }
+ for (FileStatus file : files) {
+ Path path = file.getPath();
+ long length = file.getLen();
+ if (length > 0) {
+ BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
+ boolean splittable = isSplittable(meta, schema, path);
+ if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
+ // supported disk volume
+ BlockStorageLocation[] blockStorageLocations = ((DistributedFileSystem) fs)
+ .getFileBlockStorageLocations(Arrays.asList(blkLocations));
+ if (splittable) {
+ for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
+ splits.add(makeSplit(tableName, meta, path, blockStorageLocation, getDiskIds(blockStorageLocation
+ .getVolumeIds())));
+ }
+ } else { // Non splittable
+ long blockSize = blockStorageLocations[0].getLength();
+ if (blockSize >= length) {
+ for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
+ splits.add(makeSplit(tableName, meta, path, blockStorageLocation, getDiskIds(blockStorageLocation
+ .getVolumeIds())));
+ }
+ } else {
+ splits.add(makeNonSplit(tableName, meta, path, 0, length, blockStorageLocations));
+ }
+ }
+
+ } else {
+ if (splittable) {
+ for (BlockLocation blockLocation : blkLocations) {
+ splits.add(makeSplit(tableName, meta, path, blockLocation, null));
+ }
+ } else { // Non splittable
+ splits.add(makeNonSplit(tableName, meta, path, 0, length, blkLocations));
+ }
+ }
+ } else {
+ //for zero length files
+ splits.add(makeSplit(tableName, meta, path, 0, length));
+ }
+ }
+
+ LOG.info("Total # of splits: " + splits.size());
+ return splits;
+ }
+
+ private static class InvalidInputException extends IOException {
+ List<IOException> errors;
+ public InvalidInputException(List<IOException> errors) {
+ this.errors = errors;
+ }
+
+ @Override
+ public String getMessage(){
+ StringBuffer sb = new StringBuffer();
+ int messageLimit = Math.min(errors.size(), 10);
+ for (int i = 0; i < messageLimit ; i ++) {
+ sb.append(errors.get(i).getMessage()).append("\n");
+ }
+
+ if(messageLimit < errors.size())
+ sb.append("skipped .....").append("\n");
+
+ return sb.toString();
+ }
+ }
+
+ private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
+ Configuration.class,
+ Schema.class,
+ TableMeta.class,
+ FileFragment.class
+ };
+
+ private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
+ Configuration.class,
+ Schema.class,
+ TableMeta.class,
+ Path.class
+ };
+
+ /**
+ * create a scanner instance.
+ */
+ public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta,
+ Fragment fragment) {
+ T result;
+ try {
+ Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+ if (meth == null) {
+ meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
+ meth.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(theClass, meth);
+ }
+ result = meth.newInstance(new Object[]{conf, schema, meta, fragment});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return result;
+ }
+
+ /**
+ * create a scanner instance.
+ */
+ public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TableMeta meta, Schema schema,
+ Path path) {
+ T result;
+ try {
+ Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+ if (meth == null) {
+ meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
+ meth.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(theClass, meth);
+ }
+ result = meth.newInstance(new Object[]{conf, schema, meta, path});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java
new file mode 100644
index 0000000..ed6ea34
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.statistics.TableStats;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface Appender extends Closeable {
+
+ void init() throws IOException;
+
+ void addTuple(Tuple t) throws IOException;
+
+ void flush() throws IOException;
+
+ void close() throws IOException;
+
+ void enableStats();
+
+ TableStats getStats();
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
new file mode 100644
index 0000000..ed034be
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Message;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.util.Bytes;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class BinarySerializerDeserializer implements SerializerDeserializer {
+
+ static final byte[] INVALID_UTF__SINGLE_BYTE = {(byte) Integer.parseInt("10111111", 2)};
+
+ @Override
+ public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters)
+ throws IOException {
+ byte[] bytes;
+ int length = 0;
+ if (datum == null || datum instanceof NullDatum) {
+ return 0;
+ }
+
+ switch (col.getDataType().getType()) {
+ case BOOLEAN:
+ case BIT:
+ case CHAR:
+ bytes = datum.asByteArray();
+ length = bytes.length;
+ out.write(bytes, 0, length);
+ break;
+ case INT2:
+ length = writeShort(out, datum.asInt2());
+ break;
+ case INT4:
+ length = writeVLong(out, datum.asInt4());
+ break;
+ case INT8:
+ length = writeVLong(out, datum.asInt8());
+ break;
+ case FLOAT4:
+ length = writeFloat(out, datum.asFloat4());
+ break;
+ case FLOAT8:
+ length = writeDouble(out, datum.asFloat8());
+ break;
+ case TEXT: {
+ bytes = datum.asTextBytes();
+ length = datum.size();
+ if (length == 0) {
+ bytes = INVALID_UTF__SINGLE_BYTE;
+ length = INVALID_UTF__SINGLE_BYTE.length;
+ }
+ out.write(bytes, 0, bytes.length);
+ break;
+ }
+ case BLOB:
+ case INET4:
+ case INET6:
+ bytes = datum.asByteArray();
+ length = bytes.length;
+ out.write(bytes, 0, length);
+ break;
+ case PROTOBUF:
+ ProtobufDatum protobufDatum = (ProtobufDatum) datum;
+ bytes = protobufDatum.asByteArray();
+ length = bytes.length;
+ out.write(bytes, 0, length);
+ break;
+ case NULL_TYPE:
+ break;
+ default:
+ throw new IOException("Does not support type");
+ }
+ return length;
+ }
+
+ @Override
+ public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
+ if (length == 0) return NullDatum.get();
+
+ Datum datum;
+ switch (col.getDataType().getType()) {
+ case BOOLEAN:
+ datum = DatumFactory.createBool(bytes[offset]);
+ break;
+ case BIT:
+ datum = DatumFactory.createBit(bytes[offset]);
+ break;
+ case CHAR: {
+ byte[] chars = new byte[length];
+ System.arraycopy(bytes, offset, chars, 0, length);
+ datum = DatumFactory.createChar(chars);
+ break;
+ }
+ case INT2:
+ datum = DatumFactory.createInt2(Bytes.toShort(bytes, offset, length));
+ break;
+ case INT4:
+ datum = DatumFactory.createInt4((int) Bytes.readVLong(bytes, offset));
+ break;
+ case INT8:
+ datum = DatumFactory.createInt8(Bytes.readVLong(bytes, offset));
+ break;
+ case FLOAT4:
+ datum = DatumFactory.createFloat4(toFloat(bytes, offset, length));
+ break;
+ case FLOAT8:
+ datum = DatumFactory.createFloat8(toDouble(bytes, offset, length));
+ break;
+ case TEXT: {
+ byte[] chars = new byte[length];
+ System.arraycopy(bytes, offset, chars, 0, length);
+
+ if (Bytes.equals(INVALID_UTF__SINGLE_BYTE, chars)) {
+ datum = DatumFactory.createText(new byte[0]);
+ } else {
+ datum = DatumFactory.createText(chars);
+ }
+ break;
+ }
+ case PROTOBUF: {
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType().getCode());
+ Message.Builder builder = factory.newBuilder();
+ builder.mergeFrom(bytes, offset, length);
+ datum = factory.createDatum(builder);
+ break;
+ }
+ case INET4:
+ datum = DatumFactory.createInet4(bytes, offset, length);
+ break;
+ case BLOB:
+ datum = DatumFactory.createBlob(bytes, offset, length);
+ break;
+ default:
+ datum = NullDatum.get();
+ }
+ return datum;
+ }
+
+ private byte[] shortBytes = new byte[2];
+
+ public int writeShort(OutputStream out, short val) throws IOException {
+ shortBytes[0] = (byte) (val >> 8);
+ shortBytes[1] = (byte) val;
+ out.write(shortBytes, 0, 2);
+ return 2;
+ }
+
+ public float toFloat(byte[] bytes, int offset, int length) {
+ Preconditions.checkArgument(length == 4);
+
+ int val = ((bytes[offset] & 0x000000FF) << 24) +
+ ((bytes[offset + 1] & 0x000000FF) << 16) +
+ ((bytes[offset + 2] & 0x000000FF) << 8) +
+ (bytes[offset + 3] & 0x000000FF);
+ return Float.intBitsToFloat(val);
+ }
+
+ private byte[] floatBytes = new byte[4];
+
+ public int writeFloat(OutputStream out, float f) throws IOException {
+ int val = Float.floatToIntBits(f);
+
+ floatBytes[0] = (byte) (val >> 24);
+ floatBytes[1] = (byte) (val >> 16);
+ floatBytes[2] = (byte) (val >> 8);
+ floatBytes[3] = (byte) val;
+ out.write(floatBytes, 0, 4);
+ return floatBytes.length;
+ }
+
+ public double toDouble(byte[] bytes, int offset, int length) {
+ Preconditions.checkArgument(length == 8);
+ long val = ((long) (bytes[offset] & 0x00000000000000FF) << 56) +
+ ((long) (bytes[offset + 1] & 0x00000000000000FF) << 48) +
+ ((long) (bytes[offset + 2] & 0x00000000000000FF) << 40) +
+ ((long) (bytes[offset + 3] & 0x00000000000000FF) << 32) +
+ ((long) (bytes[offset + 4] & 0x00000000000000FF) << 24) +
+ ((long) (bytes[offset + 5] & 0x00000000000000FF) << 16) +
+ ((long) (bytes[offset + 6] & 0x00000000000000FF) << 8) +
+ (long) (bytes[offset + 7] & 0x00000000000000FF);
+ return Double.longBitsToDouble(val);
+ }
+
+ private byte[] doubleBytes = new byte[8];
+
+ public int writeDouble(OutputStream out, double d) throws IOException {
+ long val = Double.doubleToLongBits(d);
+
+ doubleBytes[0] = (byte) (val >> 56);
+ doubleBytes[1] = (byte) (val >> 48);
+ doubleBytes[2] = (byte) (val >> 40);
+ doubleBytes[3] = (byte) (val >> 32);
+ doubleBytes[4] = (byte) (val >> 24);
+ doubleBytes[5] = (byte) (val >> 16);
+ doubleBytes[6] = (byte) (val >> 8);
+ doubleBytes[7] = (byte) val;
+ out.write(doubleBytes, 0, 8);
+ return doubleBytes.length;
+ }
+
+ private byte[] vLongBytes = new byte[9];
+
+ public int writeVLongToByteArray(byte[] bytes, int offset, long l) {
+ if (l >= -112 && l <= 127) {
+ bytes[offset] = (byte) l;
+ return 1;
+ }
+
+ int len = -112;
+ if (l < 0) {
+ l ^= -1L; // take one's complement'
+ len = -120;
+ }
+
+ long tmp = l;
+ while (tmp != 0) {
+ tmp = tmp >> 8;
+ len--;
+ }
+
+ bytes[offset++] = (byte) len;
+ len = (len < -120) ? -(len + 120) : -(len + 112);
+
+ for (int idx = len; idx != 0; idx--) {
+ int shiftbits = (idx - 1) * 8;
+ bytes[offset++] = (byte) ((l & (0xFFL << shiftbits)) >> shiftbits);
+ }
+ return 1 + len;
+ }
+
+ public int writeVLong(OutputStream out, long l) throws IOException {
+ int len = writeVLongToByteArray(vLongBytes, 0, l);
+ out.write(vLongBytes, 0, len);
+ return len;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
new file mode 100644
index 0000000..5d05d6f
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -0,0 +1,531 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.*;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.compress.CodecPool;
+import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+import org.apache.tajo.util.Bytes;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+public class CSVFile {
+
+ public static final String DELIMITER = "csvfile.delimiter";
+ public static final String NULL = "csvfile.null"; //read only
+ public static final String SERDE = "csvfile.serde";
+ public static final String DELIMITER_DEFAULT = "|";
+ public static final byte LF = '\n';
+ public static int EOF = -1;
+
+ private static final Log LOG = LogFactory.getLog(CSVFile.class);
+
+ public static class CSVAppender extends FileAppender {
+ private final TableMeta meta;
+ private final Schema schema;
+ private final int columnNum;
+ private final FileSystem fs;
+ private FSDataOutputStream fos;
+ private DataOutputStream outputStream;
+ private CompressionOutputStream deflateFilter;
+ private char delimiter;
+ private TableStatistics stats = null;
+ private Compressor compressor;
+ private CompressionCodecFactory codecFactory;
+ private CompressionCodec codec;
+ private Path compressedPath;
+ private byte[] nullChars;
+ private int BUFFER_SIZE = 128 * 1024;
+ private int bufferedBytes = 0;
+ private long pos = 0;
+
+ private NonSyncByteArrayOutputStream os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
+ private SerializerDeserializer serde;
+
+ public CSVAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) throws IOException {
+ super(conf, schema, meta, path);
+ this.fs = path.getFileSystem(conf);
+ this.meta = meta;
+ this.schema = schema;
+ this.delimiter = StringEscapeUtils.unescapeJava(this.meta.getOption(DELIMITER, DELIMITER_DEFAULT)).charAt(0);
+ this.columnNum = schema.getColumnNum();
+ String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(NULL));
+ if (StringUtils.isEmpty(nullCharacters)) {
+ nullChars = NullDatum.get().asTextBytes();
+ } else {
+ nullChars = nullCharacters.getBytes();
+ }
+ }
+
+ @Override
+ public void init() throws IOException {
+ if (!fs.exists(path.getParent())) {
+ throw new FileNotFoundException(path.toString());
+ }
+
+ String codecName = this.meta.getOption(TableMeta.COMPRESSION_CODEC);
+ if(!StringUtils.isEmpty(codecName)){
+ codecFactory = new CompressionCodecFactory(conf);
+ codec = codecFactory.getCodecByClassName(codecName);
+ compressor = CodecPool.getCompressor(codec);
+ if(compressor != null) compressor.reset(); //builtin gzip is null
+
+ String extension = codec.getDefaultExtension();
+ compressedPath = path.suffix(extension);
+
+ if (fs.exists(compressedPath)) {
+ throw new AlreadyExistsStorageException(compressedPath);
+ }
+
+ fos = fs.create(compressedPath);
+ deflateFilter = codec.createOutputStream(fos, compressor);
+ outputStream = new DataOutputStream(deflateFilter);
+
+ } else {
+ if (fs.exists(path)) {
+ throw new AlreadyExistsStorageException(path);
+ }
+ fos = fs.create(path);
+ outputStream = new DataOutputStream(new BufferedOutputStream(fos));
+ }
+
+ if (enabledStats) {
+ this.stats = new TableStatistics(this.schema);
+ }
+
+ try {
+ String serdeClass = this.meta.getOption(SERDE, TextSerializerDeserializer.class.getName());
+ serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new IOException(e);
+ }
+
+ os.reset();
+ pos = fos.getPos();
+ bufferedBytes = 0;
+ super.init();
+ }
+
+
+ @Override
+ public void addTuple(Tuple tuple) throws IOException {
+ Datum datum;
+ int rowBytes = 0;
+
+ for (int i = 0; i < columnNum; i++) {
+ datum = tuple.get(i);
+ rowBytes += serde.serialize(schema.getColumn(i), datum, os, nullChars);
+
+ if(columnNum - 1 > i){
+ os.write((byte) delimiter);
+ rowBytes += 1;
+ }
+ if (enabledStats) {
+ stats.analyzeField(i, datum);
+ }
+ }
+ os.write(LF);
+ rowBytes += 1;
+
+ pos += rowBytes;
+ bufferedBytes += rowBytes;
+ if(bufferedBytes > BUFFER_SIZE){
+ flushBuffer();
+ }
+ // Statistical section
+ if (enabledStats) {
+ stats.incrementRow();
+ }
+ }
+
+ private void flushBuffer() throws IOException {
+ if(os.getLength() > 0) {
+ os.writeTo(outputStream);
+ os.reset();
+ bufferedBytes = 0;
+ }
+ }
+ @Override
+ public long getOffset() throws IOException {
+ return pos;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ flushBuffer();
+ outputStream.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ try {
+ flush();
+
+ // Statistical section
+ if (enabledStats) {
+ stats.setNumBytes(getOffset());
+ }
+
+ if(deflateFilter != null) {
+ deflateFilter.finish();
+ deflateFilter.resetState();
+ deflateFilter = null;
+ }
+
+ os.close();
+ } finally {
+ IOUtils.cleanup(LOG, fos);
+ if (compressor != null) {
+ CodecPool.returnCompressor(compressor);
+ compressor = null;
+ }
+ }
+ }
+
+ @Override
+ public TableStats getStats() {
+ if (enabledStats) {
+ return stats.getTableStat();
+ } else {
+ return null;
+ }
+ }
+
+ public boolean isCompress() {
+ return compressor != null;
+ }
+
+ public String getExtension() {
+ return codec != null ? codec.getDefaultExtension() : "";
+ }
+ }
+
+ public static class CSVScanner extends FileScanner implements SeekableScanner {
+ public CSVScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment)
+ throws IOException {
+ super(conf, schema, meta, fragment);
+ factory = new CompressionCodecFactory(conf);
+ codec = factory.getCodec(fragment.getPath());
+ if (codec == null || codec instanceof SplittableCompressionCodec) {
+ splittable = true;
+ }
+
+ //Delimiter
+ String delim = meta.getOption(DELIMITER, DELIMITER_DEFAULT);
+ this.delimiter = StringEscapeUtils.unescapeJava(delim).charAt(0);
+
+ String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(NULL));
+ if (StringUtils.isEmpty(nullCharacters)) {
+ nullChars = NullDatum.get().asTextBytes();
+ } else {
+ nullChars = nullCharacters.getBytes();
+ }
+ }
+
+ private final static int DEFAULT_PAGE_SIZE = 256 * 1024;
+ private char delimiter;
+ private FileSystem fs;
+ private FSDataInputStream fis;
+ private InputStream is; //decompressd stream
+ private CompressionCodecFactory factory;
+ private CompressionCodec codec;
+ private Decompressor decompressor;
+ private Seekable filePosition;
+ private boolean splittable = false;
+ private long startOffset, end, pos;
+ private int currentIdx = 0, validIdx = 0, recordCount = 0;
+ private int[] targetColumnIndexes;
+ private boolean eof = false;
+ private final byte[] nullChars;
+ private SplitLineReader reader;
+ private ArrayList<Long> fileOffsets = new ArrayList<Long>();
+ private ArrayList<Integer> rowLengthList = new ArrayList<Integer>();
+ private ArrayList<Integer> startOffsets = new ArrayList<Integer>();
+ private NonSyncByteArrayOutputStream buffer = new NonSyncByteArrayOutputStream(DEFAULT_PAGE_SIZE);
+ private SerializerDeserializer serde;
+
+ @Override
+ public void init() throws IOException {
+
+ // FileFragment information
+ if(fs == null) {
+ fs = FileScanner.getFileSystem((TajoConf)conf, fragment.getPath());
+ }
+ if(fis == null) fis = fs.open(fragment.getPath());
+
+ recordCount = 0;
+ pos = startOffset = fragment.getStartKey();
+ end = startOffset + fragment.getEndKey();
+
+ if (codec != null) {
+ decompressor = CodecPool.getDecompressor(codec);
+ if (codec instanceof SplittableCompressionCodec) {
+ SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
+ fis, decompressor, startOffset, end,
+ SplittableCompressionCodec.READ_MODE.BYBLOCK);
+
+ reader = new CompressedSplitLineReader(cIn, conf, null);
+ startOffset = cIn.getAdjustedStart();
+ end = cIn.getAdjustedEnd();
+ filePosition = cIn;
+ is = cIn;
+ } else {
+ is = new DataInputStream(codec.createInputStream(fis, decompressor));
+ reader = new SplitLineReader(is, null);
+ filePosition = fis;
+ }
+ } else {
+ fis.seek(startOffset);
+ filePosition = fis;
+ is = fis;
+ reader = new SplitLineReader(is, null);
+ }
+
+ if (targets == null) {
+ targets = schema.toArray();
+ }
+
+ targetColumnIndexes = new int[targets.length];
+ for (int i = 0; i < targets.length; i++) {
+ targetColumnIndexes[i] = schema.getColumnIdByName(targets[i].getColumnName());
+ }
+
+ try {
+ String serdeClass = this.meta.getOption(SERDE, TextSerializerDeserializer.class.getName());
+ serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new IOException(e);
+ }
+
+ super.init();
+ Arrays.sort(targetColumnIndexes);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CSVScanner open:" + fragment.getPath() + "," + startOffset + "," + end +
+ "," + fs.getFileStatus(fragment.getPath()).getLen());
+ }
+
+ if (startOffset != 0) {
+ startOffset += reader.readLine(new Text(), 0, maxBytesToConsume(startOffset));
+ pos = startOffset;
+ }
+ eof = false;
+ page();
+ }
+
+ private int maxBytesToConsume(long pos) {
+ return isCompress() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos);
+ }
+
+ private long fragmentable() throws IOException {
+ return end - getFilePosition();
+ }
+
+ private long getFilePosition() throws IOException {
+ long retVal;
+ if (isCompress()) {
+ retVal = filePosition.getPos();
+ } else {
+ retVal = pos;
+ }
+ return retVal;
+ }
+
+ private void page() throws IOException {
+// // Index initialization
+ currentIdx = 0;
+ validIdx = 0;
+ int currentBufferPos = 0;
+ int bufferedSize = 0;
+
+ buffer.reset();
+ startOffsets.clear();
+ rowLengthList.clear();
+ fileOffsets.clear();
+
+ if(eof) return;
+
+ while (DEFAULT_PAGE_SIZE >= bufferedSize){
+
+ int ret = reader.readDefaultLine(buffer, rowLengthList, Integer.MAX_VALUE, Integer.MAX_VALUE);
+
+ if(ret == 0){
+ break;
+ } else {
+ fileOffsets.add(pos);
+ pos += ret;
+ startOffsets.add(currentBufferPos);
+ currentBufferPos += rowLengthList.get(rowLengthList.size() - 1);
+ bufferedSize += ret;
+ validIdx++;
+ recordCount++;
+ }
+
+ if(getFilePosition() > end && !reader.needAdditionalRecordAfterSplit()){
+ eof = true;
+ break;
+ }
+ }
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ try {
+ if (currentIdx == validIdx) {
+ if (eof) {
+ return null;
+ } else {
+ page();
+
+ if(currentIdx == validIdx){
+ return null;
+ }
+ }
+ }
+
+ long offset = -1;
+ if(!isCompress()){
+ offset = fileOffsets.get(currentIdx);
+ }
+
+ byte[][] cells = Bytes.splitPreserveAllTokens(buffer.getData(), startOffsets.get(currentIdx),
+ rowLengthList.get(currentIdx), delimiter, targetColumnIndexes);
+ currentIdx++;
+ return new LazyTuple(schema, cells, offset, nullChars, serde);
+ } catch (Throwable t) {
+ LOG.error("Tuple list length: " + (fileOffsets != null ? fileOffsets.size() : 0), t);
+ LOG.error("Tuple list current index: " + currentIdx, t);
+ throw new IOException(t);
+ }
+ }
+
+ private boolean isCompress() {
+ return codec != null;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ if (decompressor != null) {
+ CodecPool.returnDecompressor(decompressor);
+ decompressor = null;
+ }
+
+ init();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ IOUtils.cleanup(LOG, reader, is, fis);
+ fs = null;
+ is = null;
+ fis = null;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CSVScanner processed record:" + recordCount);
+ }
+ } finally {
+ if (decompressor != null) {
+ CodecPool.returnDecompressor(decompressor);
+ decompressor = null;
+ }
+ }
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return true;
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ @Override
+ public void setSearchCondition(Object expr) {
+ }
+
+ @Override
+ public void seek(long offset) throws IOException {
+ if(isCompress()) throw new UnsupportedException();
+
+ int tupleIndex = Arrays.binarySearch(fileOffsets.toArray(), offset);
+
+ if (tupleIndex > -1) {
+ this.currentIdx = tupleIndex;
+ } else if (isSplittable() && end >= offset || startOffset <= offset) {
+ eof = false;
+ fis.seek(offset);
+ pos = offset;
+ reader.reset();
+ this.currentIdx = 0;
+ this.validIdx = 0;
+ // pageBuffer();
+ } else {
+ throw new IOException("invalid offset " +
+ " < start : " + startOffset + " , " +
+ " end : " + end + " , " +
+ " filePos : " + filePosition.getPos() + " , " +
+ " input offset : " + offset + " >");
+ }
+ }
+
+ @Override
+ public long getNextOffset() throws IOException {
+ if(isCompress()) throw new UnsupportedException();
+
+ if (this.currentIdx == this.validIdx) {
+ if (fragmentable() <= 0) {
+ return -1;
+ } else {
+ page();
+ if(currentIdx == validIdx) return -1;
+ }
+ }
+ return fileOffsets.get(currentIdx);
+ }
+
+ @Override
+ public boolean isSplittable(){
+ return splittable;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
new file mode 100644
index 0000000..4f58e68
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.SplitCompressionInputStream;
+import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+
+/**
+ * Line reader for compressed splits
+ *
+ * Reading records from a compressed split is tricky, as the
+ * LineRecordReader is using the reported compressed input stream
+ * position directly to determine when a split has ended. In addition the
+ * compressed input stream is usually faking the actual byte position, often
+ * updating it only after the first compressed block after the split is
+ * accessed.
+ *
+ * Depending upon where the last compressed block of the split ends relative
+ * to the record delimiters it can be easy to accidentally drop the last
+ * record or duplicate the last record between this split and the next.
+ *
+ * Split end scenarios:
+ *
+ * 1) Last block of split ends in the middle of a record
+ * Nothing special that needs to be done here, since the compressed input
+ * stream will report a position after the split end once the record
+ * is fully read. The consumer of the next split will discard the
+ * partial record at the start of the split normally, and no data is lost
+ * or duplicated between the splits.
+ *
+ * 2) Last block of split ends in the middle of a delimiter
+ * The line reader will continue to consume bytes into the next block to
+ * locate the end of the delimiter. If a custom delimiter is being used
+ * then the next record must be read by this split or it will be dropped.
+ * The consumer of the next split will not recognize the partial
+ * delimiter at the beginning of its split and will discard it along with
+ * the next record.
+ *
+ * However for the default delimiter processing there is a special case
+ * because CR, LF, and CRLF are all valid record delimiters. If the
+ * block ends with a CR then the reader must peek at the next byte to see
+ * if it is an LF and therefore part of the same record delimiter.
+ * Peeking at the next byte is an access to the next block and triggers
+ * the stream to report the end of the split. There are two cases based
+ * on the next byte:
+ *
+ * A) The next byte is LF
+ * The split needs to end after the current record is returned. The
+ * consumer of the next split will discard the first record, which
+ * is degenerate since LF is itself a delimiter, and start consuming
+ * records after that byte. If the current split tries to read
+ * another record then the record will be duplicated between splits.
+ *
+ * B) The next byte is not LF
+ * The current record will be returned but the stream will report
+ * the split has ended due to the peek into the next block. If the
+ * next record is not read then it will be lost, as the consumer of
+ * the next split will discard it before processing subsequent
+ * records. Therefore the next record beyond the reported split end
+ * must be consumed by this split to avoid data loss.
+ *
+ * 3) Last block of split ends at the beginning of a delimiter
+ * This is equivalent to case 1, as the reader will consume bytes into
+ * the next block and trigger the end of the split. No further records
+ * should be read as the consumer of the next split will discard the
+ * (degenerate) record at the beginning of its split.
+ *
+ * 4) Last block of split ends at the end of a delimiter
+ * Nothing special needs to be done here. The reader will not start
+ * examining the bytes into the next block until the next record is read,
+ * so the stream will not report the end of the split just yet. Once the
+ * next record is read then the next block will be accessed and the
+ * stream will indicate the end of the split. The consumer of the next
+ * split will correctly discard the first record of its split, and no
+ * data is lost or duplicated.
+ *
+ * If the default delimiter is used and the block ends at a CR then this
+ * is treated as case 2 since the reader does not yet know without
+ * looking at subsequent bytes whether the delimiter has ended.
+ *
+ * NOTE: It is assumed that compressed input streams *never* return bytes from
+ * multiple compressed blocks from a single read. Failure to do so will
+ * violate the buffering performed by this class, as it will access
+ * bytes into the next block after the split before returning all of the
+ * records from the previous block.
+ */
+
+public class CompressedSplitLineReader extends SplitLineReader {
+ SplitCompressionInputStream scin;
+ private boolean usingCRLF;
+ private boolean needAdditionalRecord = false;
+ private boolean finished = false;
+
+ public CompressedSplitLineReader(SplitCompressionInputStream in,
+ Configuration conf,
+ byte[] recordDelimiterBytes)
+ throws IOException {
+ super(in, conf, recordDelimiterBytes);
+ scin = in;
+ usingCRLF = (recordDelimiterBytes == null);
+ }
+
+ @Override
+ protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
+ throws IOException {
+ int bytesRead = in.read(buffer);
+
+ // If the split ended in the middle of a record delimiter then we need
+ // to read one additional record, as the consumer of the next split will
+ // not recognize the partial delimiter as a record.
+ // However if using the default delimiter and the next character is a
+ // linefeed then next split will treat it as a delimiter all by itself
+ // and the additional record read should not be performed.
+ if (inDelimiter && bytesRead > 0) {
+ if (usingCRLF) {
+ needAdditionalRecord = (buffer[0] != '\n');
+ } else {
+ needAdditionalRecord = true;
+ }
+ }
+ return bytesRead;
+ }
+
+ @Override
+ public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
+ throws IOException {
+ int bytesRead = 0;
+ if (!finished) {
+ // only allow at most one more record to be read after the stream
+ // reports the split ended
+ if (scin.getPos() > scin.getAdjustedEnd()) {
+ finished = true;
+ }
+
+ bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume);
+ }
+ return bytesRead;
+ }
+
+ @Override
+ public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength
+ , int maxBytesToConsume) throws IOException {
+ int bytesRead = 0;
+ if (!finished) {
+ // only allow at most one more record to be read after the stream
+ // reports the split ended
+ if (scin.getPos() > scin.getAdjustedEnd()) {
+ finished = true;
+ }
+
+ bytesRead = super.readDefaultLine(str, offsets, maxLineLength, maxBytesToConsume);
+ }
+ return bytesRead;
+ }
+
+ @Override
+ public boolean needAdditionalRecordAfterSplit() {
+ return !finished && needAdditionalRecord;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java
new file mode 100644
index 0000000..8841a31
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+public class DataLocation {
+ private String host;
+ private int volumeId;
+
+ public DataLocation(String host, int volumeId) {
+ this.host = host;
+ this.volumeId = volumeId;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getVolumeId() {
+ return volumeId;
+ }
+
+ @Override
+ public String toString() {
+ return "DataLocation{" +
+ "host=" + host +
+ ", volumeId=" + volumeId +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
new file mode 100644
index 0000000..064841f
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+
+import java.io.IOException;
+
+public abstract class FileAppender implements Appender {
+ protected boolean inited = false;
+
+ protected final Configuration conf;
+ protected final TableMeta meta;
+ protected final Schema schema;
+ protected final Path path;
+
+ protected boolean enabledStats;
+
+ public FileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) {
+ this.conf = conf;
+ this.meta = meta;
+ this.schema = schema;
+ this.path = path;
+ }
+
+ public void init() throws IOException {
+ if (inited) {
+ throw new IllegalStateException("FileAppender is already initialized.");
+ }
+ inited = true;
+ }
+
+ public void enableStats() {
+ if (inited) {
+ throw new IllegalStateException("Should enable this option before init()");
+ }
+
+ this.enabledStats = true;
+ }
+
+ public abstract long getOffset() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
new file mode 100644
index 0000000..c831822
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.FileFragment;
+
+import java.io.IOException;
+
+public abstract class FileScanner implements Scanner {
+ private static final Log LOG = LogFactory.getLog(FileScanner.class);
+
+ protected boolean inited = false;
+ protected final Configuration conf;
+ protected final TableMeta meta;
+ protected final Schema schema;
+ protected final FileFragment fragment;
+ protected final int columnNum;
+
+ protected Column [] targets;
+
+ public FileScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment) {
+ this.conf = conf;
+ this.meta = meta;
+ this.schema = schema;
+ this.fragment = fragment;
+ this.columnNum = this.schema.getColumnNum();
+ }
+
+ public void init() throws IOException {
+ inited = true;
+ }
+
+ @Override
+ public Schema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public void setTarget(Column[] targets) {
+ if (inited) {
+ throw new IllegalStateException("Should be called before init()");
+ }
+ this.targets = targets;
+ }
+
+ public void setSearchCondition(Object expr) {
+ if (inited) {
+ throw new IllegalStateException("Should be called before init()");
+ }
+ }
+
+ public static FileSystem getFileSystem(TajoConf tajoConf, Path path) throws IOException {
+ String tajoUser = tajoConf.getVar(TajoConf.ConfVars.USERNAME);
+ FileSystem fs;
+ if(tajoUser != null) {
+ try {
+ fs = FileSystem.get(path.toUri(), tajoConf, tajoUser);
+ } catch (InterruptedException e) {
+ LOG.warn("Occur InterruptedException while FileSystem initiating with user[" + tajoUser + "]");
+ fs = FileSystem.get(path.toUri(), tajoConf);
+ }
+ } else {
+ fs = FileSystem.get(path.toUri(), tajoConf);
+ }
+
+ return fs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
new file mode 100644
index 0000000..f05a316
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.storage;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.exception.UnimplementedException;
+import org.apache.tajo.exception.UnsupportedException;
+
+import java.net.InetAddress;
+
+/**
+ * An instance of FrameTuple is an immutable tuple.
+ * It contains two tuples and pretends to be one instance of Tuple for
+ * join qual evaluatations.
+ */
+public class FrameTuple implements Tuple, Cloneable {
+ private int size;
+ private int leftSize;
+
+ private Tuple left;
+ private Tuple right;
+
+ public FrameTuple() {}
+
+ public FrameTuple(Tuple left, Tuple right) {
+ set(left, right);
+ }
+
+ public void set(Tuple left, Tuple right) {
+ this.size = left.size() + right.size();
+ this.left = left;
+ this.leftSize = left.size();
+ this.right = right;
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public boolean contains(int fieldId) {
+ Preconditions.checkArgument(fieldId < size,
+ "Out of field access: " + fieldId);
+
+ if (fieldId < leftSize) {
+ return left.contains(fieldId);
+ } else {
+ return right.contains(fieldId - leftSize);
+ }
+ }
+
+ @Override
+ public boolean isNull(int fieldid) {
+ return get(fieldid) instanceof NullDatum;
+ }
+
+ @Override
+ public void clear() {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public void put(int fieldId, Datum value) {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public void put(int fieldId, Datum[] values) {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public void put(int fieldId, Tuple tuple) {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public void setOffset(long offset) {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public long getOffset() {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public void put(Datum [] values) {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public Datum get(int fieldId) {
+ Preconditions.checkArgument(fieldId < size,
+ "Out of field access: " + fieldId);
+
+ if (fieldId < leftSize) {
+ return left.get(fieldId);
+ } else {
+ return right.get(fieldId - leftSize);
+ }
+ }
+
+ @Override
+ public BooleanDatum getBoolean(int fieldId) {
+ return (BooleanDatum) get(fieldId);
+ }
+
+ @Override
+ public BitDatum getByte(int fieldId) {
+ return (BitDatum) get(fieldId);
+ }
+
+ @Override
+ public CharDatum getChar(int fieldId) {
+ return (CharDatum) get(fieldId);
+ }
+
+ @Override
+ public BlobDatum getBytes(int fieldId) {
+ return (BlobDatum) get(fieldId);
+ }
+
+ @Override
+ public Int2Datum getShort(int fieldId) {
+ return (Int2Datum) get(fieldId);
+ }
+
+ @Override
+ public Int4Datum getInt(int fieldId) {
+ return (Int4Datum) get(fieldId);
+ }
+
+ @Override
+ public Int8Datum getLong(int fieldId) {
+ return (Int8Datum) get(fieldId);
+ }
+
+ @Override
+ public Float4Datum getFloat(int fieldId) {
+ return (Float4Datum) get(fieldId);
+ }
+
+ @Override
+ public Float8Datum getDouble(int fieldId) {
+ return (Float8Datum) get(fieldId);
+ }
+
+ @Override
+ public Inet4Datum getIPv4(int fieldId) {
+ return (Inet4Datum) get(fieldId);
+ }
+
+ @Override
+ public byte[] getIPv4Bytes(int fieldId) {
+ return get(fieldId).asByteArray();
+ }
+
+ @Override
+ public InetAddress getIPv6(int fieldId) {
+ throw new UnimplementedException();
+ }
+
+ @Override
+ public byte[] getIPv6Bytes(int fieldId) {
+ throw new UnimplementedException();
+ }
+
+ @Override
+ public TextDatum getString(int fieldId) {
+ return (TextDatum) get(fieldId);
+ }
+
+ @Override
+ public TextDatum getText(int fieldId) {
+ return (TextDatum) get(fieldId);
+ }
+
+ @Override
+ public Tuple clone() throws CloneNotSupportedException {
+ FrameTuple frameTuple = (FrameTuple) super.clone();
+ frameTuple.set(this.left.clone(), this.right.clone());
+ return frameTuple;
+ }
+
+ @Override
+ public Datum[] getValues(){
+ throw new UnsupportedException();
+ }
+
+ public String toString() {
+ boolean first = true;
+ StringBuilder str = new StringBuilder();
+ str.append("(");
+ for(int i=0; i < size(); i++) {
+ if(contains(i)) {
+ if(first) {
+ first = false;
+ } else {
+ str.append(", ");
+ }
+ str.append(i)
+ .append("=>")
+ .append(get(i));
+ }
+ }
+ str.append(")");
+ return str.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
new file mode 100644
index 0000000..4d484df
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.datum.exception.InvalidCastException;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+
+public class LazyTuple implements Tuple, Cloneable {
+ private long offset;
+ private Datum[] values;
+ private byte[][] textBytes;
+ private Schema schema;
+ private byte[] nullBytes;
+ private SerializerDeserializer serializeDeserialize;
+
+ public LazyTuple(Schema schema, byte[][] textBytes, long offset) {
+ this(schema, textBytes, offset, NullDatum.get().asTextBytes(), new TextSerializerDeserializer());
+ }
+
+ public LazyTuple(Schema schema, byte[][] textBytes, long offset, byte[] nullBytes, SerializerDeserializer serde) {
+ this.schema = schema;
+ this.textBytes = textBytes;
+ this.values = new Datum[schema.getColumnNum()];
+ this.offset = offset;
+ this.nullBytes = nullBytes;
+ this.serializeDeserialize = serde;
+ }
+
+ public LazyTuple(LazyTuple tuple) {
+ this.values = tuple.getValues();
+ this.offset = tuple.offset;
+ this.schema = tuple.schema;
+ this.textBytes = new byte[size()][];
+ this.nullBytes = tuple.nullBytes;
+ this.serializeDeserialize = tuple.serializeDeserialize;
+ }
+
+ @Override
+ public int size() {
+ return values.length;
+ }
+
+ @Override
+ public boolean contains(int fieldid) {
+ return textBytes[fieldid] != null || values[fieldid] != null;
+ }
+
+ @Override
+ public boolean isNull(int fieldid) {
+ return get(fieldid) instanceof NullDatum;
+ }
+
+ @Override
+ public void clear() {
+ for (int i = 0; i < values.length; i++) {
+ values[i] = null;
+ textBytes[i] = null;
+ }
+ }
+
+ //////////////////////////////////////////////////////
+ // Setter
+ //////////////////////////////////////////////////////
+ @Override
+ public void put(int fieldId, Datum value) {
+ values[fieldId] = value;
+ textBytes[fieldId] = null;
+ }
+
+ @Override
+ public void put(int fieldId, Datum[] values) {
+ for (int i = fieldId, j = 0; j < values.length; i++, j++) {
+ this.values[i] = values[j];
+ }
+ this.textBytes = new byte[values.length][];
+ }
+
+ @Override
+ public void put(int fieldId, Tuple tuple) {
+ for (int i = fieldId, j = 0; j < tuple.size(); i++, j++) {
+ values[i] = tuple.get(j);
+ textBytes[i] = null;
+ }
+ }
+
+ @Override
+ public void put(Datum[] values) {
+ System.arraycopy(values, 0, this.values, 0, size());
+ this.textBytes = new byte[values.length][];
+ }
+
+ //////////////////////////////////////////////////////
+ // Getter
+ //////////////////////////////////////////////////////
+ @Override
+ public Datum get(int fieldId) {
+ if (values[fieldId] != null)
+ return values[fieldId];
+ else if (textBytes.length <= fieldId) {
+ values[fieldId] = NullDatum.get(); // split error. (col : 3, separator: ',', row text: "a,")
+ } else if (textBytes[fieldId] != null) {
+ try {
+ values[fieldId] = serializeDeserialize.deserialize(schema.getColumn(fieldId),
+ textBytes[fieldId], 0, textBytes[fieldId].length, nullBytes);
+ } catch (Exception e) {
+ values[fieldId] = NullDatum.get();
+ }
+ textBytes[fieldId] = null;
+ } else {
+ //non-projection
+ }
+ return values[fieldId];
+ }
+
+ @Override
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+ @Override
+ public long getOffset() {
+ return this.offset;
+ }
+
+ @Override
+ public BooleanDatum getBoolean(int fieldId) {
+ return (BooleanDatum) get(fieldId);
+ }
+
+ @Override
+ public BitDatum getByte(int fieldId) {
+ return (BitDatum) get(fieldId);
+ }
+
+ @Override
+ public CharDatum getChar(int fieldId) {
+ return (CharDatum) get(fieldId);
+ }
+
+ @Override
+ public BlobDatum getBytes(int fieldId) {
+ return (BlobDatum) get(fieldId);
+ }
+
+ @Override
+ public Int2Datum getShort(int fieldId) {
+ return (Int2Datum) get(fieldId);
+ }
+
+ @Override
+ public Int4Datum getInt(int fieldId) {
+ return (Int4Datum) get(fieldId);
+ }
+
+ @Override
+ public Int8Datum getLong(int fieldId) {
+ return (Int8Datum) get(fieldId);
+ }
+
+ @Override
+ public Float4Datum getFloat(int fieldId) {
+ return (Float4Datum) get(fieldId);
+ }
+
+ @Override
+ public Float8Datum getDouble(int fieldId) {
+ return (Float8Datum) get(fieldId);
+ }
+
+ @Override
+ public Inet4Datum getIPv4(int fieldId) {
+ return (Inet4Datum) get(fieldId);
+ }
+
+ @Override
+ public byte[] getIPv4Bytes(int fieldId) {
+ return get(fieldId).asByteArray();
+ }
+
+ @Override
+ public InetAddress getIPv6(int fieldId) {
+ throw new InvalidCastException("IPv6 is unsupported yet");
+ }
+
+ @Override
+ public byte[] getIPv6Bytes(int fieldId) {
+ throw new InvalidCastException("IPv6 is unsupported yet");
+ }
+
+ @Override
+ public TextDatum getString(int fieldId) {
+ return (TextDatum) get(fieldId);
+ }
+
+ @Override
+ public TextDatum getText(int fieldId) {
+ return (TextDatum) get(fieldId);
+ }
+
+ public byte[] getTextBytes(int fieldId) {
+ if(textBytes[fieldId] != null)
+ return textBytes[fieldId];
+ else {
+ return get(fieldId).asTextBytes();
+ }
+ }
+
+ public String toString() {
+ boolean first = true;
+ StringBuilder str = new StringBuilder();
+ str.append("(");
+ Datum d;
+ for (int i = 0; i < values.length; i++) {
+ d = get(i);
+ if (d != null) {
+ if (first) {
+ first = false;
+ } else {
+ str.append(", ");
+ }
+ str.append(i)
+ .append("=>")
+ .append(d);
+ }
+ }
+ str.append(")");
+ return str.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ int hashCode = 37;
+ for (int i = 0; i < values.length; i++) {
+ Datum d = get(i);
+ if (d != null) {
+ hashCode ^= (d.hashCode() * 41);
+ } else {
+ hashCode = hashCode ^ (i + 17);
+ }
+ }
+
+ return hashCode;
+ }
+
+ @Override
+ public Datum[] getValues() {
+ Datum[] datums = new Datum[values.length];
+ for (int i = 0; i < values.length; i++) {
+ datums[i] = get(i);
+ }
+ return datums;
+ }
+
+ @Override
+ public Tuple clone() throws CloneNotSupportedException {
+ LazyTuple lazyTuple = (LazyTuple) super.clone();
+
+ lazyTuple.values = getValues(); //shallow copy
+ lazyTuple.textBytes = new byte[size()][];
+ return lazyTuple;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof Tuple) {
+ Tuple other = (Tuple) obj;
+ return Arrays.equals(getValues(), other.getValues());
+ }
+ return false;
+ }
+}