You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/03 06:30:43 UTC
[28/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project
structure.
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/DiskUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DiskUtil.java
deleted file mode 100644
index 2d68870..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskUtil.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.server.common.Util;
-
-import java.io.*;
-import java.net.URI;
-import java.util.*;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
-
-public class DiskUtil {
-
- static String UNIX_DISK_DEVICE_PATH = "/proc/partitions";
-
- public enum OSType {
- OS_TYPE_UNIX, OS_TYPE_WINXP, OS_TYPE_SOLARIS, OS_TYPE_MAC
- }
-
- static private OSType getOSType() {
- String osName = System.getProperty("os.name");
- if (osName.contains("Windows")
- && (osName.contains("XP") || osName.contains("2003")
- || osName.contains("Vista")
- || osName.contains("Windows_7")
- || osName.contains("Windows 7") || osName
- .contains("Windows7"))) {
- return OSType.OS_TYPE_WINXP;
- } else if (osName.contains("SunOS") || osName.contains("Solaris")) {
- return OSType.OS_TYPE_SOLARIS;
- } else if (osName.contains("Mac")) {
- return OSType.OS_TYPE_MAC;
- } else {
- return OSType.OS_TYPE_UNIX;
- }
- }
-
- public static List<DiskDeviceInfo> getDiskDeviceInfos() throws IOException {
- List<DiskDeviceInfo> deviceInfos;
-
- if(getOSType() == OSType.OS_TYPE_UNIX) {
- deviceInfos = getUnixDiskDeviceInfos();
- setDeviceMountInfo(deviceInfos);
- } else {
- deviceInfos = getDefaultDiskDeviceInfos();
- }
-
- return deviceInfos;
- }
-
- private static List<DiskDeviceInfo> getUnixDiskDeviceInfos() {
- List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
-
- File file = new File(UNIX_DISK_DEVICE_PATH);
- if(!file.exists()) {
- System.out.println("No partition file:" + file.getAbsolutePath());
- return getDefaultDiskDeviceInfos();
- }
-
- BufferedReader reader = null;
- try {
- reader = new BufferedReader(new InputStreamReader(new FileInputStream(UNIX_DISK_DEVICE_PATH)));
- String line = null;
-
- int count = 0;
- Set<String> deviceNames = new TreeSet<String>();
- while((line = reader.readLine()) != null) {
- if(count > 0 && !line.trim().isEmpty()) {
- String[] tokens = line.trim().split(" +");
- if(tokens.length == 4) {
- String deviceName = getDiskDeviceName(tokens[3]);
- deviceNames.add(deviceName);
- }
- }
- count++;
- }
-
- int id = 0;
- for(String eachDeviceName: deviceNames) {
- DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(id++);
- diskDeviceInfo.setName(eachDeviceName);
-
- //TODO set addtional info
- // /sys/block/sda/queue
- infos.add(diskDeviceInfo);
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if(reader != null) {
- try {
- reader.close();
- } catch (IOException e) {
- }
- }
- }
-
- return infos;
- }
-
- private static String getDiskDeviceName(String partitionName) {
- byte[] bytes = partitionName.getBytes();
-
- byte[] result = new byte[bytes.length];
- int length = 0;
- for(int i = 0; i < bytes.length; i++, length++) {
- if(bytes[i] >= '0' && bytes[i] <= '9') {
- break;
- } else {
- result[i] = bytes[i];
- }
- }
-
- return new String(result, 0, length);
- }
-
- public static List<DiskDeviceInfo> getDefaultDiskDeviceInfos() {
- DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(0);
- diskDeviceInfo.setName("default");
-
- List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
-
- infos.add(diskDeviceInfo);
-
- return infos;
- }
-
-
- private static void setDeviceMountInfo(List<DiskDeviceInfo> deviceInfos) throws IOException {
- Map<String, DiskDeviceInfo> deviceMap = new HashMap<String, DiskDeviceInfo>();
- for(DiskDeviceInfo eachDevice: deviceInfos) {
- deviceMap.put(eachDevice.getName(), eachDevice);
- }
-
- BufferedReader mountOutput = null;
- try {
- Process mountProcess = Runtime.getRuntime().exec("mount");
- mountOutput = new BufferedReader(new InputStreamReader(
- mountProcess.getInputStream()));
- while (true) {
- String line = mountOutput.readLine();
- if (line == null) {
- break;
- }
-
- int indexStart = line.indexOf(" on /");
- int indexEnd = line.indexOf(" ", indexStart + 4);
-
- String deviceName = line.substring(0, indexStart).trim();
- String[] deviceNameTokens = deviceName.split("/");
- if(deviceNameTokens.length == 3) {
- if("dev".equals(deviceNameTokens[1])) {
- String realDeviceName = getDiskDeviceName(deviceNameTokens[2]);
- String mountPath = new File(line.substring(indexStart + 4, indexEnd)).getAbsolutePath();
-
- DiskDeviceInfo diskDeviceInfo = deviceMap.get(realDeviceName);
- if(diskDeviceInfo != null) {
- diskDeviceInfo.addMountPath(new DiskMountInfo(diskDeviceInfo.getId(), mountPath));
- }
- }
- }
- }
- } catch (IOException e) {
- throw e;
- } finally {
- if (mountOutput != null) {
- mountOutput.close();
- }
- }
- }
-
- public static int getDataNodeStorageSize(){
- return getStorageDirs().size();
- }
-
- public static List<URI> getStorageDirs(){
- Configuration conf = new HdfsConfiguration();
- Collection<String> dirNames = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
- return Util.stringCollectionAsURIs(dirNames);
- }
-
- public static void main(String[] args) throws Exception {
- System.out.println("/dev/sde1".split("/").length);
- for(String eachToken: "/dev/sde1".split("/")) {
- System.out.println(eachToken);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
deleted file mode 100644
index 7df4584..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.datum.Datum;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-
-public interface FieldSerializerDeserializer {
-
- public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException;
-
- public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
deleted file mode 100644
index 117d3da..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.conf.TajoConf;
-
-import java.io.IOException;
-
-public abstract class FileAppender implements Appender {
- private static final Log LOG = LogFactory.getLog(FileAppender.class);
-
- protected boolean inited = false;
-
- protected final Configuration conf;
- protected final TableMeta meta;
- protected final Schema schema;
- protected final Path workDir;
- protected final QueryUnitAttemptId taskAttemptId;
-
- protected boolean enabledStats;
- protected Path path;
-
- public FileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, Schema schema,
- TableMeta meta, Path workDir) {
- this.conf = conf;
- this.meta = meta;
- this.schema = schema;
- this.workDir = workDir;
- this.taskAttemptId = taskAttemptId;
-
- try {
- if (taskAttemptId != null) {
- this.path = StorageManager.getFileStorageManager((TajoConf) conf).getAppenderFilePath(taskAttemptId, workDir);
- } else {
- this.path = workDir;
- }
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- public void init() throws IOException {
- if (inited) {
- throw new IllegalStateException("FileAppender is already initialized.");
- }
- inited = true;
- }
-
- public void enableStats() {
- if (inited) {
- throw new IllegalStateException("Should enable this option before init()");
- }
-
- this.enabledStats = true;
- }
-
- public long getEstimatedOutputSize() throws IOException {
- return getOffset();
- }
-
- public abstract long getOffset() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
deleted file mode 100644
index 038f0f4..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.ColumnStats;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-
-import java.io.IOException;
-
-public abstract class FileScanner implements Scanner {
- private static final Log LOG = LogFactory.getLog(FileScanner.class);
-
- protected boolean inited = false;
- protected final Configuration conf;
- protected final TableMeta meta;
- protected final Schema schema;
- protected final FileFragment fragment;
- protected final int columnNum;
-
- protected Column [] targets;
-
- protected float progress;
-
- protected TableStats tableStats;
-
- public FileScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment) {
- this.conf = conf;
- this.meta = meta;
- this.schema = schema;
- this.fragment = (FileFragment)fragment;
- this.tableStats = new TableStats();
- this.columnNum = this.schema.size();
- }
-
- public void init() throws IOException {
- inited = true;
- progress = 0.0f;
-
- if (fragment != null) {
- tableStats.setNumBytes(fragment.getLength());
- tableStats.setNumBlocks(1);
- }
-
- if (schema != null) {
- for(Column eachColumn: schema.getColumns()) {
- ColumnStats columnStats = new ColumnStats(eachColumn);
- tableStats.addColumnStat(columnStats);
- }
- }
- }
-
- @Override
- public Schema getSchema() {
- return schema;
- }
-
- @Override
- public void setTarget(Column[] targets) {
- if (inited) {
- throw new IllegalStateException("Should be called before init()");
- }
- this.targets = targets;
- }
-
- public void setSearchCondition(Object expr) {
- if (inited) {
- throw new IllegalStateException("Should be called before init()");
- }
- }
-
- public static FileSystem getFileSystem(TajoConf tajoConf, Path path) throws IOException {
- String tajoUser = tajoConf.getVar(TajoConf.ConfVars.USERNAME);
- FileSystem fs;
- if(tajoUser != null) {
- try {
- fs = FileSystem.get(path.toUri(), tajoConf, tajoUser);
- } catch (InterruptedException e) {
- LOG.warn("Occur InterruptedException while FileSystem initiating with user[" + tajoUser + "]");
- fs = FileSystem.get(path.toUri(), tajoConf);
- }
- } else {
- fs = FileSystem.get(path.toUri(), tajoConf);
- }
-
- return fs;
- }
-
- @Override
- public float getProgress() {
- return progress;
- }
-
- @Override
- public TableStats getInputStats() {
- return tableStats;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java
deleted file mode 100644
index 442ed5e..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java
+++ /dev/null
@@ -1,832 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.tajo.*;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.*;
-
-import java.io.IOException;
-import java.text.NumberFormat;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class FileStorageManager extends StorageManager {
- private final Log LOG = LogFactory.getLog(FileStorageManager.class);
-
- static final String OUTPUT_FILE_PREFIX="part-";
- static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY =
- new ThreadLocal<NumberFormat>() {
- @Override
- public NumberFormat initialValue() {
- NumberFormat fmt = NumberFormat.getInstance();
- fmt.setGroupingUsed(false);
- fmt.setMinimumIntegerDigits(2);
- return fmt;
- }
- };
- static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK =
- new ThreadLocal<NumberFormat>() {
- @Override
- public NumberFormat initialValue() {
- NumberFormat fmt = NumberFormat.getInstance();
- fmt.setGroupingUsed(false);
- fmt.setMinimumIntegerDigits(6);
- return fmt;
- }
- };
-
- static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SEQ =
- new ThreadLocal<NumberFormat>() {
- @Override
- public NumberFormat initialValue() {
- NumberFormat fmt = NumberFormat.getInstance();
- fmt.setGroupingUsed(false);
- fmt.setMinimumIntegerDigits(3);
- return fmt;
- }
- };
-
- protected FileSystem fs;
- protected Path tableBaseDir;
- protected boolean blocksMetadataEnabled;
- private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0));
-
- public FileStorageManager(StoreType storeType) {
- super(storeType);
- }
-
- @Override
- protected void storageInit() throws IOException {
- this.tableBaseDir = TajoConf.getWarehouseDir(conf);
- this.fs = tableBaseDir.getFileSystem(conf);
- this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
- DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
- if (!this.blocksMetadataEnabled)
- LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
- }
-
- public Scanner getFileScanner(TableMeta meta, Schema schema, Path path)
- throws IOException {
- FileSystem fs = path.getFileSystem(conf);
- FileStatus status = fs.getFileStatus(path);
- return getFileScanner(meta, schema, path, status);
- }
-
- public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status)
- throws IOException {
- Fragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
- return getScanner(meta, schema, fragment);
- }
-
- public FileSystem getFileSystem() {
- return this.fs;
- }
-
- public Path getWarehouseDir() {
- return this.tableBaseDir;
- }
-
- public void delete(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- fs.delete(tablePath, true);
- }
-
- public boolean exists(Path path) throws IOException {
- FileSystem fileSystem = path.getFileSystem(conf);
- return fileSystem.exists(path);
- }
-
- /**
- * This method deletes only data contained in the given path.
- *
- * @param path The path in which data are deleted.
- * @throws IOException
- */
- public void deleteData(Path path) throws IOException {
- FileSystem fileSystem = path.getFileSystem(conf);
- FileStatus[] fileLists = fileSystem.listStatus(path);
- for (FileStatus status : fileLists) {
- fileSystem.delete(status.getPath(), true);
- }
- }
-
- public Path getTablePath(String tableName) {
- return new Path(tableBaseDir, tableName);
- }
-
- @VisibleForTesting
- public Appender getAppender(TableMeta meta, Schema schema, Path filePath)
- throws IOException {
- return getAppender(null, null, meta, schema, filePath);
- }
-
- public FileFragment[] split(String tableName) throws IOException {
- Path tablePath = new Path(tableBaseDir, tableName);
- return split(tableName, tablePath, fs.getDefaultBlockSize());
- }
-
- public FileFragment[] split(String tableName, long fragmentSize) throws IOException {
- Path tablePath = new Path(tableBaseDir, tableName);
- return split(tableName, tablePath, fragmentSize);
- }
-
- public FileFragment[] splitBroadcastTable(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- List<FileFragment> listTablets = new ArrayList<FileFragment>();
- FileFragment tablet;
-
- FileStatus[] fileLists = fs.listStatus(tablePath);
- for (FileStatus file : fileLists) {
- tablet = new FileFragment(tablePath.getName(), file.getPath(), 0, file.getLen());
- listTablets.add(tablet);
- }
-
- FileFragment[] tablets = new FileFragment[listTablets.size()];
- listTablets.toArray(tablets);
-
- return tablets;
- }
-
- public FileFragment[] split(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
- }
-
- public FileFragment[] split(String tableName, Path tablePath) throws IOException {
- return split(tableName, tablePath, fs.getDefaultBlockSize());
- }
-
- private FileFragment[] split(String tableName, Path tablePath, long size)
- throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
-
- long defaultBlockSize = size;
- List<FileFragment> listTablets = new ArrayList<FileFragment>();
- FileFragment tablet;
-
- FileStatus[] fileLists = fs.listStatus(tablePath);
- for (FileStatus file : fileLists) {
- long remainFileSize = file.getLen();
- long start = 0;
- if (remainFileSize > defaultBlockSize) {
- while (remainFileSize > defaultBlockSize) {
- tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
- listTablets.add(tablet);
- start += defaultBlockSize;
- remainFileSize -= defaultBlockSize;
- }
- listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
- } else {
- listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
- }
- }
-
- FileFragment[] tablets = new FileFragment[listTablets.size()];
- listTablets.toArray(tablets);
-
- return tablets;
- }
-
- public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
- Path tablePath, long size)
- throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
-
- long defaultBlockSize = size;
- List<FileFragment> listTablets = new ArrayList<FileFragment>();
- FileFragment tablet;
-
- FileStatus[] fileLists = fs.listStatus(tablePath);
- for (FileStatus file : fileLists) {
- long remainFileSize = file.getLen();
- long start = 0;
- if (remainFileSize > defaultBlockSize) {
- while (remainFileSize > defaultBlockSize) {
- tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
- listTablets.add(tablet);
- start += defaultBlockSize;
- remainFileSize -= defaultBlockSize;
- }
- listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
- } else {
- listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
- }
- }
-
- FileFragment[] tablets = new FileFragment[listTablets.size()];
- listTablets.toArray(tablets);
-
- return tablets;
- }
-
- public long calculateSize(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- long totalSize = 0;
-
- if (fs.exists(tablePath)) {
- totalSize = fs.getContentSummary(tablePath).getLength();
- }
-
- return totalSize;
- }
-
- /////////////////////////////////////////////////////////////////////////////
- // FileInputFormat Area
- /////////////////////////////////////////////////////////////////////////////
-
- public static final PathFilter hiddenFileFilter = new PathFilter() {
- public boolean accept(Path p) {
- String name = p.getName();
- return !name.startsWith("_") && !name.startsWith(".");
- }
- };
-
- public Path getAppenderFilePath(QueryUnitAttemptId taskAttemptId, Path workDir) {
- if (taskAttemptId == null) {
- // For testcase
- return workDir;
- }
- // The final result of a task will be written in a file named part-ss-nnnnnnn,
- // where ss is the subquery id associated with this task, and nnnnnn is the task id.
- Path outFilePath = StorageUtil.concatPath(workDir, TajoConstants.RESULT_DIR_NAME,
- OUTPUT_FILE_PREFIX +
- OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskAttemptId.getQueryUnitId().getExecutionBlockId().getId()) + "-" +
- OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getQueryUnitId().getId()) + "-" +
- OUTPUT_FILE_FORMAT_SEQ.get().format(0));
- LOG.info("Output File Path: " + outFilePath);
-
- return outFilePath;
- }
-
- /**
- * Proxy PathFilter that accepts a path only if all filters given in the
- * constructor do. Used by the listPaths() to apply the built-in
- * hiddenFileFilter together with a user provided one (if any).
- */
- private static class MultiPathFilter implements PathFilter {
- private List<PathFilter> filters;
-
- public MultiPathFilter(List<PathFilter> filters) {
- this.filters = filters;
- }
-
- public boolean accept(Path path) {
- for (PathFilter filter : filters) {
- if (!filter.accept(path)) {
- return false;
- }
- }
- return true;
- }
- }
-
- /**
- * List input directories.
- * Subclasses may override to, e.g., select only files matching a regular
- * expression.
- *
- * @return array of FileStatus objects
- * @throws IOException if zero items.
- */
- protected List<FileStatus> listStatus(Path... dirs) throws IOException {
- List<FileStatus> result = new ArrayList<FileStatus>();
- if (dirs.length == 0) {
- throw new IOException("No input paths specified in job");
- }
-
- List<IOException> errors = new ArrayList<IOException>();
-
- // creates a MultiPathFilter with the hiddenFileFilter and the
- // user provided one (if any).
- List<PathFilter> filters = new ArrayList<PathFilter>();
- filters.add(hiddenFileFilter);
-
- PathFilter inputFilter = new MultiPathFilter(filters);
-
- for (int i = 0; i < dirs.length; ++i) {
- Path p = dirs[i];
-
- FileSystem fs = p.getFileSystem(conf);
- FileStatus[] matches = fs.globStatus(p, inputFilter);
- if (matches == null) {
- errors.add(new IOException("Input path does not exist: " + p));
- } else if (matches.length == 0) {
- errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
- } else {
- for (FileStatus globStat : matches) {
- if (globStat.isDirectory()) {
- for (FileStatus stat : fs.listStatus(globStat.getPath(),
- inputFilter)) {
- result.add(stat);
- }
- } else {
- result.add(globStat);
- }
- }
- }
- }
-
- if (!errors.isEmpty()) {
- throw new InvalidInputException(errors);
- }
- LOG.info("Total input paths to process : " + result.size());
- return result;
- }
-
- /**
- * Is the given filename splitable? Usually, true, but if the file is
- * stream compressed, it will not be.
- * <p/>
- * <code>FileInputFormat</code> implementations can override this and return
- * <code>false</code> to ensure that individual input files are never split-up
- * so that Mappers process entire files.
- *
- *
- * @param path the file name to check
- * @param status get the file length
- * @return is this file isSplittable?
- */
- protected boolean isSplittable(TableMeta meta, Schema schema, Path path, FileStatus status) throws IOException {
- Scanner scanner = getFileScanner(meta, schema, path, status);
- boolean split = scanner.isSplittable();
- scanner.close();
- return split;
- }
-
- private static final double SPLIT_SLOP = 1.1; // 10% slop
-
- protected int getBlockIndex(BlockLocation[] blkLocations,
- long offset) {
- for (int i = 0; i < blkLocations.length; i++) {
- // is the offset inside this block?
- if ((blkLocations[i].getOffset() <= offset) &&
- (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
- return i;
- }
- }
- BlockLocation last = blkLocations[blkLocations.length - 1];
- long fileLength = last.getOffset() + last.getLength() - 1;
- throw new IllegalArgumentException("Offset " + offset +
- " is outside of file (0.." +
- fileLength + ")");
- }
-
- /**
- * A factory that makes the split for this class. It can be overridden
- * by sub-classes to make sub-types
- */
- protected FileFragment makeSplit(String fragmentId, Path file, long start, long length) {
- return new FileFragment(fragmentId, file, start, length);
- }
-
- protected FileFragment makeSplit(String fragmentId, Path file, long start, long length,
- String[] hosts) {
- return new FileFragment(fragmentId, file, start, length, hosts);
- }
-
- protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation blockLocation)
- throws IOException {
- return new FileFragment(fragmentId, file, blockLocation);
- }
-
- // for Non Splittable. eg, compressed gzip TextFile
- protected FileFragment makeNonSplit(String fragmentId, Path file, long start, long length,
- BlockLocation[] blkLocations) throws IOException {
-
- Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
- for (BlockLocation blockLocation : blkLocations) {
- for (String host : blockLocation.getHosts()) {
- if (hostsBlockMap.containsKey(host)) {
- hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
- } else {
- hostsBlockMap.put(host, 1);
- }
- }
- }
-
- List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet());
- Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
-
- @Override
- public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) {
- return v1.getValue().compareTo(v2.getValue());
- }
- });
-
- String[] hosts = new String[blkLocations[0].getHosts().length];
-
- for (int i = 0; i < hosts.length; i++) {
- Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
- hosts[i] = entry.getKey();
- }
- return new FileFragment(fragmentId, file, start, length, hosts);
- }
-
- /**
- * Get the minimum split size
- *
- * @return the minimum number of bytes that can be in a split
- */
- public long getMinSplitSize() {
- return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE);
- }
-
- /**
- * Get Disk Ids by Volume Bytes
- */
- private int[] getDiskIds(VolumeId[] volumeIds) {
- int[] diskIds = new int[volumeIds.length];
- for (int i = 0; i < volumeIds.length; i++) {
- int diskId = -1;
- if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) {
- diskId = volumeIds[i].hashCode() - zeroVolumeId.hashCode();
- }
- diskIds[i] = diskId;
- }
- return diskIds;
- }
-
- /**
- * Generate the map of host and make them into Volume Ids.
- *
- */
- private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) {
- Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
- for (FileFragment frag : frags) {
- String[] hosts = frag.getHosts();
- int[] diskIds = frag.getDiskIds();
- for (int i = 0; i < hosts.length; i++) {
- Set<Integer> volumeList = volumeMap.get(hosts[i]);
- if (volumeList == null) {
- volumeList = new HashSet<Integer>();
- volumeMap.put(hosts[i], volumeList);
- }
-
- if (diskIds.length > 0 && diskIds[i] > -1) {
- volumeList.add(diskIds[i]);
- }
- }
- }
-
- return volumeMap;
- }
- /**
- * Generate the list of files and make them into FileSplits.
- *
- * @throws IOException
- */
- public List<Fragment> getSplits(String tableName, TableMeta meta, Schema schema, Path... inputs)
- throws IOException {
- // generate splits'
-
- List<Fragment> splits = Lists.newArrayList();
- List<Fragment> volumeSplits = Lists.newArrayList();
- List<BlockLocation> blockLocations = Lists.newArrayList();
-
- for (Path p : inputs) {
- FileSystem fs = p.getFileSystem(conf);
- ArrayList<FileStatus> files = Lists.newArrayList();
- if (fs.isFile(p)) {
- files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
- } else {
- files.addAll(listStatus(p));
- }
-
- int previousSplitSize = splits.size();
- for (FileStatus file : files) {
- Path path = file.getPath();
- long length = file.getLen();
- if (length > 0) {
- // Get locations of blocks of file
- BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
- boolean splittable = isSplittable(meta, schema, path, file);
- if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
-
- if (splittable) {
- for (BlockLocation blockLocation : blkLocations) {
- volumeSplits.add(makeSplit(tableName, path, blockLocation));
- }
- blockLocations.addAll(Arrays.asList(blkLocations));
-
- } else { // Non splittable
- long blockSize = blkLocations[0].getLength();
- if (blockSize >= length) {
- blockLocations.addAll(Arrays.asList(blkLocations));
- for (BlockLocation blockLocation : blkLocations) {
- volumeSplits.add(makeSplit(tableName, path, blockLocation));
- }
- } else {
- splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
- }
- }
-
- } else {
- if (splittable) {
-
- long minSize = Math.max(getMinSplitSize(), 1);
-
- long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one
- long splitSize = Math.max(minSize, blockSize);
- long bytesRemaining = length;
-
- // for s3
- while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
- int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
- splits.add(makeSplit(tableName, path, length - bytesRemaining, splitSize,
- blkLocations[blkIndex].getHosts()));
- bytesRemaining -= splitSize;
- }
- if (bytesRemaining > 0) {
- int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
- splits.add(makeSplit(tableName, path, length - bytesRemaining, bytesRemaining,
- blkLocations[blkIndex].getHosts()));
- }
- } else { // Non splittable
- splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
- }
- }
- } else {
- //for zero length files
- splits.add(makeSplit(tableName, path, 0, length));
- }
- }
- if(LOG.isDebugEnabled()){
- LOG.debug("# of splits per partition: " + (splits.size() - previousSplitSize));
- }
- }
-
- // Combine original fileFragments with new VolumeId information
- setVolumeMeta(volumeSplits, blockLocations);
- splits.addAll(volumeSplits);
- LOG.info("Total # of splits: " + splits.size());
- return splits;
- }
-
- private void setVolumeMeta(List<Fragment> splits, final List<BlockLocation> blockLocations)
- throws IOException {
-
- int locationSize = blockLocations.size();
- int splitSize = splits.size();
- if (locationSize == 0 || splitSize == 0) return;
-
- if (locationSize != splitSize) {
- // splits and locations don't match up
- LOG.warn("Number of block locations not equal to number of splits: "
- + "#locations=" + locationSize
- + " #splits=" + splitSize);
- return;
- }
-
- DistributedFileSystem fs = (DistributedFileSystem)DistributedFileSystem.get(conf);
- int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
- int blockLocationIdx = 0;
-
- Iterator<Fragment> iter = splits.iterator();
- while (locationSize > blockLocationIdx) {
-
- int subSize = Math.min(locationSize - blockLocationIdx, lsLimit);
- List<BlockLocation> locations = blockLocations.subList(blockLocationIdx, blockLocationIdx + subSize);
- //BlockStorageLocation containing additional volume location information for each replica of each block.
- BlockStorageLocation[] blockStorageLocations = fs.getFileBlockStorageLocations(locations);
-
- for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
- ((FileFragment)iter.next()).setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds()));
- blockLocationIdx++;
- }
- }
- LOG.info("# of splits with volumeId " + splitSize);
- }
-
- private static class InvalidInputException extends IOException {
- List<IOException> errors;
- public InvalidInputException(List<IOException> errors) {
- this.errors = errors;
- }
-
- @Override
- public String getMessage(){
- StringBuffer sb = new StringBuffer();
- int messageLimit = Math.min(errors.size(), 10);
- for (int i = 0; i < messageLimit ; i ++) {
- sb.append(errors.get(i).getMessage()).append("\n");
- }
-
- if(messageLimit < errors.size())
- sb.append("skipped .....").append("\n");
-
- return sb.toString();
- }
- }
-
- @Override
- public List<Fragment> getSplits(String tableName, TableDesc table, ScanNode scanNode) throws IOException {
- return getSplits(tableName, table.getMeta(), table.getSchema(), new Path(table.getPath()));
- }
-
- @Override
- public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException {
- if (!tableDesc.isExternal()) {
- String [] splitted = CatalogUtil.splitFQTableName(tableDesc.getName());
- String databaseName = splitted[0];
- String simpleTableName = splitted[1];
-
- // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} )
- Path tablePath = StorageUtil.concatPath(tableBaseDir, databaseName, simpleTableName);
- tableDesc.setPath(tablePath.toUri());
- } else {
- Preconditions.checkState(tableDesc.getPath() != null, "ERROR: LOCATION must be given.");
- }
-
- Path path = new Path(tableDesc.getPath());
-
- FileSystem fs = path.getFileSystem(conf);
- TableStats stats = new TableStats();
- if (tableDesc.isExternal()) {
- if (!fs.exists(path)) {
- LOG.error(path.toUri() + " does not exist");
- throw new IOException("ERROR: " + path.toUri() + " does not exist");
- }
- } else {
- fs.mkdirs(path);
- }
-
- long totalSize = 0;
-
- try {
- totalSize = calculateSize(path);
- } catch (IOException e) {
- LOG.warn("Cannot calculate the size of the relation", e);
- }
-
- stats.setNumBytes(totalSize);
-
- if (tableDesc.isExternal()) { // if it is an external table, there is no way to know the exact row number without processing.
- stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
- }
-
- tableDesc.setStats(stats);
- }
-
- @Override
- public void purgeTable(TableDesc tableDesc) throws IOException {
- try {
- Path path = new Path(tableDesc.getPath());
- FileSystem fs = path.getFileSystem(conf);
- LOG.info("Delete table data dir: " + path);
- fs.delete(path, true);
- } catch (IOException e) {
- throw new InternalError(e.getMessage());
- }
- }
-
- @Override
- public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments) throws IOException {
- // Listing table data file which is not empty.
- // If the table is a partitioned table, return file list which has same partition key.
- Path tablePath = new Path(tableDesc.getPath());
- FileSystem fs = tablePath.getFileSystem(conf);
-
- List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>();
- if (fs.exists(tablePath)) {
- getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, currentPage, numFragments,
- new AtomicInteger(0));
- }
-
- List<Fragment> fragments = new ArrayList<Fragment>();
-
- //In the case of partitioned table, return same partition key data files.
- int numPartitionColumns = 0;
- if (tableDesc.hasPartition()) {
- numPartitionColumns = tableDesc.getPartitionMethod().getExpressionSchema().getColumns().size();
- }
- String[] previousPartitionPathNames = null;
- for (FileStatus eachFile: nonZeroLengthFiles) {
- FileFragment fileFragment = new FileFragment(tableDesc.getName(), eachFile.getPath(), 0, eachFile.getLen(), null);
-
- if (numPartitionColumns > 0) {
- // finding partition key;
- Path filePath = fileFragment.getPath();
- Path parentPath = filePath;
- String[] parentPathNames = new String[numPartitionColumns];
- for (int i = 0; i < numPartitionColumns; i++) {
- parentPath = parentPath.getParent();
- parentPathNames[numPartitionColumns - i - 1] = parentPath.getName();
- }
-
- // If current partitionKey == previousPartitionKey, add to result.
- if (previousPartitionPathNames == null) {
- fragments.add(fileFragment);
- } else if (previousPartitionPathNames != null && Arrays.equals(previousPartitionPathNames, parentPathNames)) {
- fragments.add(fileFragment);
- } else {
- break;
- }
- previousPartitionPathNames = parentPathNames;
- } else {
- fragments.add(fileFragment);
- }
- }
-
- return fragments;
- }
-
- private void getNonZeroLengthDataFiles(FileSystem fs, Path path, List<FileStatus> result,
- int startFileIndex, int numResultFiles,
- AtomicInteger currentFileIndex) throws IOException {
- if (fs.isDirectory(path)) {
- FileStatus[] files = fs.listStatus(path, FileStorageManager.hiddenFileFilter);
- if (files != null && files.length > 0) {
- for (FileStatus eachFile : files) {
- if (result.size() >= numResultFiles) {
- return;
- }
- if (eachFile.isDirectory()) {
- getNonZeroLengthDataFiles(fs, eachFile.getPath(), result, startFileIndex, numResultFiles,
- currentFileIndex);
- } else if (eachFile.isFile() && eachFile.getLen() > 0) {
- if (currentFileIndex.get() >= startFileIndex) {
- result.add(eachFile);
- }
- currentFileIndex.incrementAndGet();
- }
- }
- }
- } else {
- FileStatus fileStatus = fs.getFileStatus(path);
- if (fileStatus != null && fileStatus.getLen() > 0) {
- if (currentFileIndex.get() >= startFileIndex) {
- result.add(fileStatus);
- }
- currentFileIndex.incrementAndGet();
- if (result.size() >= numResultFiles) {
- return;
- }
- }
- }
- }
-
- @Override
- public StorageProperty getStorageProperty() {
- StorageProperty storageProperty = new StorageProperty();
- storageProperty.setSortedInsert(false);
- if (storeType == StoreType.RAW) {
- storageProperty.setSupportsInsertInto(false);
- } else {
- storageProperty.setSupportsInsertInto(true);
- }
-
- return storageProperty;
- }
-
- @Override
- public void closeStorageManager() {
- }
-
- @Override
- public void beforeInsertOrCATS(LogicalNode node) throws IOException {
- }
-
- @Override
- public void rollbackOutputCommit(LogicalNode node) throws IOException {
- }
-
- @Override
- public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc,
- Schema inputSchema, SortSpec[] sortSpecs, TupleRange dataRange)
- throws IOException {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
deleted file mode 100644
index 8b7e2e0..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- *
- */
-package org.apache.tajo.storage;
-
-import com.google.common.base.Preconditions;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.IntervalDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-import org.apache.tajo.exception.UnsupportedException;
-
-/**
- * An instance of FrameTuple is an immutable tuple.
- * It contains two tuples and pretends to be one instance of Tuple for
- * join qual evaluatations.
- */
-public class FrameTuple implements Tuple, Cloneable {
- private int size;
- private int leftSize;
-
- private Tuple left;
- private Tuple right;
-
- public FrameTuple() {}
-
- public FrameTuple(Tuple left, Tuple right) {
- set(left, right);
- }
-
- public void set(Tuple left, Tuple right) {
- this.size = left.size() + right.size();
- this.left = left;
- this.leftSize = left.size();
- this.right = right;
- }
-
- @Override
- public int size() {
- return size;
- }
-
- @Override
- public boolean contains(int fieldId) {
- Preconditions.checkArgument(fieldId < size,
- "Out of field access: " + fieldId);
-
- if (fieldId < leftSize) {
- return left.contains(fieldId);
- } else {
- return right.contains(fieldId - leftSize);
- }
- }
-
- @Override
- public boolean isNull(int fieldid) {
- return get(fieldid).isNull();
- }
-
- @Override
- public boolean isNotNull(int fieldid) {
- return !isNull(fieldid);
- }
-
- @Override
- public void clear() {
- throw new UnsupportedException();
- }
-
- @Override
- public void put(int fieldId, Datum value) {
- throw new UnsupportedException();
- }
-
- @Override
- public void put(int fieldId, Datum[] values) {
- throw new UnsupportedException();
- }
-
- @Override
- public void put(int fieldId, Tuple tuple) {
- throw new UnsupportedException();
- }
-
- @Override
- public void setOffset(long offset) {
- throw new UnsupportedException();
- }
-
- @Override
- public long getOffset() {
- throw new UnsupportedException();
- }
-
- @Override
- public void put(Datum [] values) {
- throw new UnsupportedException();
- }
-
- @Override
- public Datum get(int fieldId) {
- Preconditions.checkArgument(fieldId < size,
- "Out of field access: " + fieldId);
-
- if (fieldId < leftSize) {
- return left.get(fieldId);
- } else {
- return right.get(fieldId - leftSize);
- }
- }
-
- @Override
- public boolean getBool(int fieldId) {
- return get(fieldId).asBool();
- }
-
- @Override
- public byte getByte(int fieldId) {
- return get(fieldId).asByte();
- }
-
- @Override
- public char getChar(int fieldId) {
- return get(fieldId).asChar();
- }
-
- @Override
- public byte [] getBytes(int fieldId) {
- return get(fieldId).asByteArray();
- }
-
- @Override
- public short getInt2(int fieldId) {
- return get(fieldId).asInt2();
- }
-
- @Override
- public int getInt4(int fieldId) {
- return get(fieldId).asInt4();
- }
-
- @Override
- public long getInt8(int fieldId) {
- return get(fieldId).asInt8();
- }
-
- @Override
- public float getFloat4(int fieldId) {
- return get(fieldId).asFloat4();
- }
-
- @Override
- public double getFloat8(int fieldId) {
- return get(fieldId).asFloat8();
- }
-
- @Override
- public String getText(int fieldId) {
- return get(fieldId).asChars();
- }
-
- @Override
- public ProtobufDatum getProtobufDatum(int fieldId) {
- return (ProtobufDatum) get(fieldId);
- }
-
- @Override
- public IntervalDatum getInterval(int fieldId) {
- return (IntervalDatum) get(fieldId);
- }
-
- @Override
- public char [] getUnicodeChars(int fieldId) {
- return get(fieldId).asUnicodeChars();
- }
-
- @Override
- public Tuple clone() throws CloneNotSupportedException {
- FrameTuple frameTuple = (FrameTuple) super.clone();
- frameTuple.set(this.left.clone(), this.right.clone());
- return frameTuple;
- }
-
- @Override
- public Datum[] getValues(){
- throw new UnsupportedException();
- }
-
- public String toString() {
- boolean first = true;
- StringBuilder str = new StringBuilder();
- str.append("(");
- for(int i=0; i < size(); i++) {
- if(contains(i)) {
- if(first) {
- first = false;
- } else {
- str.append(", ");
- }
- str.append(i)
- .append("=>")
- .append(get(i));
- }
- }
- str.append(")");
- return str.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
deleted file mode 100644
index 40cad32..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.util.Pair;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class HashShuffleAppender implements Appender {
- private static Log LOG = LogFactory.getLog(HashShuffleAppender.class);
-
- private FileAppender appender;
- private AtomicBoolean closed = new AtomicBoolean(false);
- private int partId;
-
- private TableStats tableStats;
-
- //<taskId,<page start offset,<task start, task end>>>
- private Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes;
-
- //page start offset, length
- private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
-
- private Pair<Long, Integer> currentPage;
-
- private int pageSize; //MB
-
- private int rowNumInPage;
-
- private int totalRows;
-
- private long offset;
-
- private ExecutionBlockId ebId;
-
- public HashShuffleAppender(ExecutionBlockId ebId, int partId, int pageSize, FileAppender appender) {
- this.ebId = ebId;
- this.partId = partId;
- this.appender = appender;
- this.pageSize = pageSize;
- }
-
- @Override
- public void init() throws IOException {
- currentPage = new Pair(0L, 0);
- taskTupleIndexes = new HashMap<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>>();
- rowNumInPage = 0;
- }
-
- /**
- * Write multiple tuples. Each tuple is written by a FileAppender which is responsible specified partition.
- * After writing if a current page exceeds pageSize, pageOffset will be added.
- * @param taskId
- * @param tuples
- * @return written bytes
- * @throws IOException
- */
- public int addTuples(QueryUnitAttemptId taskId, List<Tuple> tuples) throws IOException {
- synchronized(appender) {
- if (closed.get()) {
- return 0;
- }
- long currentPos = appender.getOffset();
-
- for (Tuple eachTuple: tuples) {
- appender.addTuple(eachTuple);
- }
- long posAfterWritten = appender.getOffset();
-
- int writtenBytes = (int)(posAfterWritten - currentPos);
-
- int nextRowNum = rowNumInPage + tuples.size();
- List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = taskTupleIndexes.get(taskId);
- if (taskIndexes == null) {
- taskIndexes = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
- taskTupleIndexes.put(taskId, taskIndexes);
- }
- taskIndexes.add(
- new Pair<Long, Pair<Integer, Integer>>(currentPage.getFirst(), new Pair(rowNumInPage, nextRowNum)));
- rowNumInPage = nextRowNum;
-
- if (posAfterWritten - currentPage.getFirst() > pageSize) {
- nextPage(posAfterWritten);
- rowNumInPage = 0;
- }
-
- totalRows += tuples.size();
- return writtenBytes;
- }
- }
-
- public long getOffset() throws IOException {
- if (closed.get()) {
- return offset;
- } else {
- return appender.getOffset();
- }
- }
-
- private void nextPage(long pos) {
- currentPage.setSecond((int) (pos - currentPage.getFirst()));
- pages.add(currentPage);
- currentPage = new Pair(pos, 0);
- }
-
- @Override
- public void addTuple(Tuple t) throws IOException {
- throw new IOException("Not support addTuple, use addTuples()");
- }
-
- @Override
- public void flush() throws IOException {
- synchronized(appender) {
- if (closed.get()) {
- return;
- }
- appender.flush();
- }
- }
-
- @Override
- public long getEstimatedOutputSize() throws IOException {
- return pageSize * pages.size();
- }
-
- @Override
- public void close() throws IOException {
- synchronized(appender) {
- if (closed.get()) {
- return;
- }
- appender.flush();
- offset = appender.getOffset();
- if (offset > currentPage.getFirst()) {
- nextPage(offset);
- }
- appender.close();
- if (LOG.isDebugEnabled()) {
- if (!pages.isEmpty()) {
- LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size()
- + ", lastPage=" + pages.get(pages.size() - 1));
- } else {
- LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size());
- }
- }
- closed.set(true);
- tableStats = appender.getStats();
- }
- }
-
- @Override
- public void enableStats() {
- }
-
- @Override
- public TableStats getStats() {
- synchronized(appender) {
- return appender.getStats();
- }
- }
-
- public List<Pair<Long, Integer>> getPages() {
- return pages;
- }
-
- public Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> getTaskTupleIndexes() {
- return taskTupleIndexes;
- }
-
- public List<Pair<Long, Pair<Integer, Integer>>> getMergedTupleIndexes() {
- List<Pair<Long, Pair<Integer, Integer>>> merged = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
-
- for (List<Pair<Long, Pair<Integer, Integer>>> eachFailureIndex: taskTupleIndexes.values()) {
- merged.addAll(eachFailureIndex);
- }
-
- return merged;
- }
-
- public void taskFinished(QueryUnitAttemptId taskId) {
- taskTupleIndexes.remove(taskId);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
deleted file mode 100644
index 33a9233..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.util.Pair;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class HashShuffleAppenderManager {
- private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class);
-
- private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap =
- new ConcurrentHashMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>();
- private TajoConf systemConf;
- private FileSystem defaultFS;
- private FileSystem localFS;
- private LocalDirAllocator lDirAllocator;
- private int pageSize;
-
- public HashShuffleAppenderManager(TajoConf systemConf) throws IOException {
- this.systemConf = systemConf;
-
- // initialize LocalDirAllocator
- lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
-
- // initialize DFS and LocalFileSystems
- defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
- localFS = FileSystem.getLocal(systemConf);
- pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024;
- }
-
- public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId,
- TableMeta meta, Schema outSchema) throws IOException {
- synchronized (appenderMap) {
- Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId);
-
- if (partitionAppenderMap == null) {
- partitionAppenderMap = new ConcurrentHashMap<Integer, PartitionAppenderMeta>();
- appenderMap.put(ebId, partitionAppenderMap);
- }
-
- PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId);
- if (partitionAppenderMeta == null) {
- Path dataFile = getDataFile(ebId, partId);
- FileSystem fs = dataFile.getFileSystem(systemConf);
- if (fs.exists(dataFile)) {
- FileStatus status = fs.getFileStatus(dataFile);
- LOG.info("File " + dataFile + " already exists, size=" + status.getLen());
- }
-
- if (!fs.exists(dataFile.getParent())) {
- fs.mkdirs(dataFile.getParent());
- }
- FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(
- tajoConf, null).getAppender(meta, outSchema, dataFile);
- appender.enableStats();
- appender.init();
-
- partitionAppenderMeta = new PartitionAppenderMeta();
- partitionAppenderMeta.partId = partId;
- partitionAppenderMeta.dataFile = dataFile;
- partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender);
- partitionAppenderMeta.appender.init();
- partitionAppenderMap.put(partId, partitionAppenderMeta);
-
- LOG.info("Create Hash shuffle file(partId=" + partId + "): " + dataFile);
- }
-
- return partitionAppenderMeta.appender;
- }
- }
-
- public static int getPartParentId(int partId, TajoConf tajoConf) {
- return partId % tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS);
- }
-
- private Path getDataFile(ExecutionBlockId ebId, int partId) throws IOException {
- try {
- // the base dir for an output dir
- String executionBlockBaseDir = ebId.getQueryId().toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle";
- Path baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, systemConf));
- //LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")");
-
- // If EB has many partition, too many shuffle file are in single directory.
- return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- throw new IOException(e);
- }
- }
-
- public List<HashShuffleIntermediate> close(ExecutionBlockId ebId) throws IOException {
- Map<Integer, PartitionAppenderMeta> partitionAppenderMap = null;
- synchronized (appenderMap) {
- partitionAppenderMap = appenderMap.remove(ebId);
- }
-
- if (partitionAppenderMap == null) {
- LOG.info("Close HashShuffleAppender:" + ebId + ", not a hash shuffle");
- return null;
- }
-
- // Send Intermediate data to QueryMaster.
- List<HashShuffleIntermediate> intermEntries = new ArrayList<HashShuffleIntermediate>();
- for (PartitionAppenderMeta eachMeta : partitionAppenderMap.values()) {
- try {
- eachMeta.appender.close();
- HashShuffleIntermediate intermediate =
- new HashShuffleIntermediate(eachMeta.partId, eachMeta.appender.getOffset(),
- eachMeta.appender.getPages(),
- eachMeta.appender.getMergedTupleIndexes());
- intermEntries.add(intermediate);
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- throw e;
- }
- }
-
- LOG.info("Close HashShuffleAppender:" + ebId + ", intermediates=" + intermEntries.size());
-
- return intermEntries;
- }
-
- public void finalizeTask(QueryUnitAttemptId taskId) {
- synchronized (appenderMap) {
- Map<Integer, PartitionAppenderMeta> partitionAppenderMap =
- appenderMap.get(taskId.getQueryUnitId().getExecutionBlockId());
- if (partitionAppenderMap == null) {
- return;
- }
-
- for (PartitionAppenderMeta eachAppender: partitionAppenderMap.values()) {
- eachAppender.appender.taskFinished(taskId);
- }
- }
- }
-
- public static class HashShuffleIntermediate {
- private int partId;
-
- private long volume;
-
- //[<page start offset,<task start, task end>>]
- private Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes;
-
- //[<page start offset, length>]
- private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
-
- public HashShuffleIntermediate(int partId, long volume,
- List<Pair<Long, Integer>> pages,
- Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes) {
- this.partId = partId;
- this.volume = volume;
- this.failureTskTupleIndexes = failureTskTupleIndexes;
- this.pages = pages;
- }
-
- public int getPartId() {
- return partId;
- }
-
- public long getVolume() {
- return volume;
- }
-
- public Collection<Pair<Long, Pair<Integer, Integer>>> getFailureTskTupleIndexes() {
- return failureTskTupleIndexes;
- }
-
- public List<Pair<Long, Integer>> getPages() {
- return pages;
- }
- }
-
- static class PartitionAppenderMeta {
- int partId;
- HashShuffleAppender appender;
- Path dataFile;
-
- public int getPartId() {
- return partId;
- }
-
- public HashShuffleAppender getAppender() {
- return appender;
- }
-
- public Path getDataFile() {
- return dataFile;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
deleted file mode 100644
index bfbe478..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.IntervalDatum;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-import org.apache.tajo.exception.UnsupportedException;
-
-import java.util.Arrays;
-
-public class LazyTuple implements Tuple, Cloneable {
- private long offset;
- private Datum[] values;
- private byte[][] textBytes;
- private Schema schema;
- private byte[] nullBytes;
- private SerializerDeserializer serializeDeserialize;
-
- public LazyTuple(Schema schema, byte[][] textBytes, long offset) {
- this(schema, textBytes, offset, NullDatum.get().asTextBytes(), new TextSerializerDeserializer());
- }
-
- public LazyTuple(Schema schema, byte[][] textBytes, long offset, byte[] nullBytes, SerializerDeserializer serde) {
- this.schema = schema;
- this.textBytes = textBytes;
- this.values = new Datum[schema.size()];
- this.offset = offset;
- this.nullBytes = nullBytes;
- this.serializeDeserialize = serde;
- }
-
- public LazyTuple(LazyTuple tuple) {
- this.values = tuple.getValues();
- this.offset = tuple.offset;
- this.schema = tuple.schema;
- this.textBytes = new byte[size()][];
- this.nullBytes = tuple.nullBytes;
- this.serializeDeserialize = tuple.serializeDeserialize;
- }
-
- @Override
- public int size() {
- return values.length;
- }
-
- @Override
- public boolean contains(int fieldid) {
- return textBytes[fieldid] != null || values[fieldid] != null;
- }
-
- @Override
- public boolean isNull(int fieldid) {
- return get(fieldid).isNull();
- }
-
- @Override
- public boolean isNotNull(int fieldid) {
- return !isNull(fieldid);
- }
-
- @Override
- public void clear() {
- for (int i = 0; i < values.length; i++) {
- values[i] = null;
- textBytes[i] = null;
- }
- }
-
- //////////////////////////////////////////////////////
- // Setter
- //////////////////////////////////////////////////////
- @Override
- public void put(int fieldId, Datum value) {
- values[fieldId] = value;
- textBytes[fieldId] = null;
- }
-
- @Override
- public void put(int fieldId, Datum[] values) {
- for (int i = fieldId, j = 0; j < values.length; i++, j++) {
- this.values[i] = values[j];
- }
- this.textBytes = new byte[values.length][];
- }
-
- @Override
- public void put(int fieldId, Tuple tuple) {
- for (int i = fieldId, j = 0; j < tuple.size(); i++, j++) {
- values[i] = tuple.get(j);
- textBytes[i] = null;
- }
- }
-
- @Override
- public void put(Datum[] values) {
- System.arraycopy(values, 0, this.values, 0, size());
- this.textBytes = new byte[values.length][];
- }
-
- //////////////////////////////////////////////////////
- // Getter
- //////////////////////////////////////////////////////
- @Override
- public Datum get(int fieldId) {
- if (values[fieldId] != null)
- return values[fieldId];
- else if (textBytes.length <= fieldId) {
- values[fieldId] = NullDatum.get(); // split error. (col : 3, separator: ',', row text: "a,")
- } else if (textBytes[fieldId] != null) {
- try {
- values[fieldId] = serializeDeserialize.deserialize(schema.getColumn(fieldId),
- textBytes[fieldId], 0, textBytes[fieldId].length, nullBytes);
- } catch (Exception e) {
- values[fieldId] = NullDatum.get();
- }
- textBytes[fieldId] = null;
- } else {
- //non-projection
- }
- return values[fieldId];
- }
-
- @Override
- public void setOffset(long offset) {
- this.offset = offset;
- }
-
- @Override
- public long getOffset() {
- return this.offset;
- }
-
- @Override
- public boolean getBool(int fieldId) {
- return get(fieldId).asBool();
- }
-
- @Override
- public byte getByte(int fieldId) {
- return get(fieldId).asByte();
- }
-
- @Override
- public char getChar(int fieldId) {
- return get(fieldId).asChar();
- }
-
- @Override
- public byte [] getBytes(int fieldId) {
- return get(fieldId).asByteArray();
- }
-
- @Override
- public short getInt2(int fieldId) {
- return get(fieldId).asInt2();
- }
-
- @Override
- public int getInt4(int fieldId) {
- return get(fieldId).asInt4();
- }
-
- @Override
- public long getInt8(int fieldId) {
- return get(fieldId).asInt8();
- }
-
- @Override
- public float getFloat4(int fieldId) {
- return get(fieldId).asFloat4();
- }
-
- @Override
- public double getFloat8(int fieldId) {
- return get(fieldId).asFloat8();
- }
-
- @Override
- public String getText(int fieldId) {
- return get(fieldId).asChars();
- }
-
- @Override
- public ProtobufDatum getProtobufDatum(int fieldId) {
- throw new UnsupportedException();
- }
-
- @Override
- public IntervalDatum getInterval(int fieldId) {
- return (IntervalDatum) get(fieldId);
- }
-
- @Override
- public char[] getUnicodeChars(int fieldId) {
- return get(fieldId).asUnicodeChars();
- }
-
- public String toString() {
- boolean first = true;
- StringBuilder str = new StringBuilder();
- str.append("(");
- Datum d;
- for (int i = 0; i < values.length; i++) {
- d = get(i);
- if (d != null) {
- if (first) {
- first = false;
- } else {
- str.append(", ");
- }
- str.append(i)
- .append("=>")
- .append(d);
- }
- }
- str.append(")");
- return str.toString();
- }
-
- @Override
- public int hashCode() {
- return Arrays.hashCode(values);
- }
-
- @Override
- public Datum[] getValues() {
- Datum[] datums = new Datum[values.length];
- for (int i = 0; i < values.length; i++) {
- datums[i] = get(i);
- }
- return datums;
- }
-
- @Override
- public Tuple clone() throws CloneNotSupportedException {
- LazyTuple lazyTuple = (LazyTuple) super.clone();
-
- lazyTuple.values = getValues(); //shallow copy
- lazyTuple.textBytes = new byte[size()][];
- return lazyTuple;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof Tuple) {
- Tuple other = (Tuple) obj;
- return Arrays.equals(getValues(), other.getValues());
- }
- return false;
- }
-}