You are viewing a plain text version of this content. The canonical link for it is here.
Posted to codereview@trafodion.apache.org by selvaganesang <gi...@git.apache.org> on 2018/01/26 16:52:58 UTC

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

GitHub user selvaganesang opened a pull request:

    https://github.com/apache/trafodion/pull/1417

    [TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for t…

    …ext formatted hive tables
    
    Part-1 changes.
    
    Created a new class org.trafodion.sql.HDFSClient. Any direct HDFS access
    will be routed to this class via JNI instead of using libhdfs.
    
    Modified the existing code expect for the following to route the HDFS request via this class
    1. LOB access
    2. Direct HDFS scan of the table
    3. Sample data creation during update stats
    
    Added a new class org.trafodio.sql.HdfsScan for scanning one or many ranges of a Hive
    text formatted table. This class will be used for Direct HDFS scan in near future.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/selvaganesang/trafodion hdfs_scan_improvements

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/trafodion/pull/1417.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1417
    
----
commit 60db153329d1ee7088f6805ef3c5eb9eb8b600de
Author: selvaganesang <se...@...>
Date:   2018-01-26T16:40:37Z

    [TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text formatted hive tables
    
    Part-1 changes.
    
    Created a new class org.trafodion.sql.HDFSClient. Any direct HDFS access
    will be routed to this class via JNI instead of using libhdfs.
    
    Modified the existing code expect for the following to route the HDFS request via this class
    1. LOB access
    2. Direct HDFS scan of the table
    3. Sample data creation during update stats
    
    Added a new class org.trafodio.sql.HdfsScan for scanning one or many ranges of a Hive
    text formatted table. This class will be used for Direct HDFS scan in near future.

----


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by eowhadi <gi...@git.apache.org>.
Github user eowhadi commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r164189696
  
    --- Diff: core/sql/executor/ExHbaseAccess.cpp ---
    @@ -502,6 +506,8 @@ void ExHbaseAccessTcb::freeResources()
          NADELETEBASIC(directRowBuffer_, getHeap());
       if (colVal_.val != NULL)
          NADELETEBASIC(colVal_.val, getHeap());
    +  if (hdfsClient_ != NULL) 
    +     NADELETE(hdfsClient_, HdfsClient, getHeap());
     }
    --- End diff --
    
    I am wondering if there is a need to delete the new introduces loggingFileName_ here?


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by selvaganesang <gi...@git.apache.org>.
Github user selvaganesang commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167473755
  
    --- Diff: core/sql/executor/ExExeUtilGet.cpp ---
    @@ -3521,13 +3521,9 @@ ExExeUtilGetHbaseObjectsTcb::ExExeUtilGetHbaseObjectsTcb(
          ex_globals * glob)
          : ExExeUtilGetMetadataInfoTcb( exe_util_tdb, glob)
     {
    -  int jniDebugPort = 0;
    -  int jniDebugTimeout = 0;
       ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(),
     					(char*)exe_util_tdb.server(), 
    -					(char*)exe_util_tdb.zkPort(),
    -                                        jniDebugPort,
    -                                        jniDebugTimeout);
    +					(char*)exe_util_tdb.zkPort());
    --- End diff --
    
    DebugPort needs to be initialized as part of JVM invocation. See JavaObjectInterface:;createJVM. This debugPort is never used. Because it is per process, it needs be be made static


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by selvaganesang <gi...@git.apache.org>.
Github user selvaganesang commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167474246
  
    --- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
    @@ -0,0 +1,319 @@
    +// @@@ START COPYRIGHT @@@
    +//
    +// Licensed to the Apache Software Foundation (ASF) under one
    +// or more contributor license agreements.  See the NOTICE file
    +// distributed with this work for additional information
    +// regarding copyright ownership.  The ASF licenses this file
    +// to you under the Apache License, Version 2.0 (the
    +// "License"); you may not use this file except in compliance
    +// with the License.  You may obtain a copy of the License at
    +//
    +//   http://www.apache.org/licenses/LICENSE-2.0
    +//
    +// Unless required by applicable law or agreed to in writing,
    +// software distributed under the License is distributed on an
    +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +// KIND, either express or implied.  See the License for the
    +// specific language governing permissions and limitations
    +// under the License.
    +//
    +// @@@ END COPYRIGHT @@@
    +
    +package org.trafodion.sql;
    +
    +import org.apache.log4j.PropertyConfigurator;
    +import org.apache.log4j.Logger;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.FileUtil;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.conf.Configuration;
    +import java.nio.ByteBuffer;
    +import java.io.IOException;
    +import java.io.EOFException;
    +import java.io.OutputStream;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.hadoop.io.compress.CodecPool;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.Compressor;
    +import org.apache.hadoop.io.compress.GzipCodec;
    +import org.apache.hadoop.io.SequenceFile.CompressionType;
    +import org.apache.hadoop.util.ReflectionUtils;
    +
    +public class HDFSClient 
    +{
    +   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
    +   private static Configuration config_ = null;
    +   private static ExecutorService executorService_ = null;
    +   private static FileSystem defaultFs_ = null;
    +   private FileSystem fs_ = null;
    +   private int bufNo_;
    +   private int rangeNo_;
    +   private FSDataInputStream fsdis_; 
    +   private OutputStream outStream_;
    +   private String filename_;
    +   private ByteBuffer buf_;
    +   private int bufLen_;
    +   private int bufOffset_ = 0;
    +   private long pos_ = 0;
    +   private int len_ = 0;
    +   private int lenRemain_ = 0; 
    +   private int blockSize_; 
    +   private int bytesRead_;
    +   private Future future_ = null;
    +   private int isEOF_ = 0; 
    +   static {
    +      String confFile = System.getProperty("trafodion.log4j.configFile");
    +      System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
    +      if (confFile == null) {
    +         confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
    +      }
    +      PropertyConfigurator.configure(confFile);
    +      config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
    +      executorService_ = Executors.newCachedThreadPool();
    +      try {
    +         defaultFs_ = FileSystem.get(config_);
    +      }
    +      catch (IOException ioe) {
    +         throw new RuntimeException("Exception in HDFSClient static block", ioe);
    +      }
    +   }
    +
    +   class HDFSRead implements Callable 
    +   {
    +      HDFSRead() 
    +      {
    +      }
    + 
    +      public Object call() throws IOException 
    +      {
    +         int bytesRead;
    +         int totalBytesRead = 0;
    +         if (! buf_.hasArray()) {
    --- End diff --
    
    HdfsScan supports both direct and non-direct buffers.  Currently we use Direct byte buffers wherein the ByteBuffer is wrapping the native array in C++. However, it is possible for hdfsScan to use non-direct ByteBuffer which is wrapper for byteArray in jVM memory


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by selvaganesang <gi...@git.apache.org>.
Github user selvaganesang commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167600705
  
    --- Diff: core/sql/executor/ExHdfsScan.cpp ---
    @@ -118,15 +119,39 @@ ExHdfsScanTcb::ExHdfsScanTcb(
       , dataModCheckDone_(FALSE)
       , loggingErrorDiags_(NULL)
       , loggingFileName_(NULL)
    +  , hdfsClient_(NULL)
    +  , hdfsScan_(NULL)
    +  , hdfsStats_(NULL)
       , hdfsFileInfoListAsArray_(glob->getDefaultHeap(), hdfsScanTdb.getHdfsFileInfoList()->numEntries())
     {
       Space * space = (glob ? glob->getSpace() : 0);
       CollHeap * heap = (glob ? glob->getDefaultHeap() : 0);
    +  useLibhdfsScan_ = hdfsScanTdb.getUseLibhdfsScan();
    +  if (isSequenceFile())
    +     useLibhdfsScan_ = TRUE;
       lobGlob_ = NULL;
    -  const int readBufSize =  (Int32)hdfsScanTdb.hdfsBufSize_;
    -  hdfsScanBuffer_ = new(space) char[ readBufSize + 1 ]; 
    -  hdfsScanBuffer_[readBufSize] = '\0';
    -
    +  hdfsScanBufMaxSize_ = hdfsScanTdb.hdfsBufSize_;
    +  headRoom_ = (Int32)hdfsScanTdb.rangeTailIOSize_;
    +
    +  if (useLibhdfsScan_) {
    +     hdfsScanBuffer_ = new(heap) char[ hdfsScanBufMaxSize_ + 1 ]; 
    +     hdfsScanBuffer_[hdfsScanBufMaxSize_] = '\0';
    +  } else {
    +     hdfsScanBufBacking_[0] = new (heap) BYTE[hdfsScanBufMaxSize_ + 2 * (headRoom_)];
    --- End diff --
    
    This behavior remains unchanged. Let me check if there is a regression test suite already for it


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by selvaganesang <gi...@git.apache.org>.
Github user selvaganesang commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167473505
  
    --- Diff: core/sql/executor/ExHdfsScan.cpp ---
    @@ -380,10 +407,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
       HdfsFileInfo *hdfo = NULL;
       Lng32 openType = 0;
       int changedLen = 0;
    - ContextCli *currContext = getGlobals()->castToExExeStmtGlobals()->getCliGlobals()->currContext();
    -   hdfsFS hdfs = currContext->getHdfsServerConnection(hdfsScanTdb().hostName_,hdfsScanTdb().port_);
    -   hdfsFileInfo *dirInfo = NULL;
    -   Int32 hdfsErrorDetail = 0;//this is errno returned form underlying hdfsOpenFile call.
    +  ContextCli *currContext = getGlobals()->castToExExeStmtGlobals()->getCliGlobals()->currContext();
    +  hdfsFS hdfs = currContext->getHdfsServerConnection(hdfsScanTdb().hostName_,hdfsScanTdb().port_);
    --- End diff --
    
    Didn't make any change in this line except for aligning it


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by sureshsubbiah <gi...@git.apache.org>.
Github user sureshsubbiah commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167401848
  
    --- Diff: core/sql/executor/ExHdfsScan.cpp ---
    @@ -380,10 +407,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
       HdfsFileInfo *hdfo = NULL;
       Lng32 openType = 0;
       int changedLen = 0;
    - ContextCli *currContext = getGlobals()->castToExExeStmtGlobals()->getCliGlobals()->currContext();
    -   hdfsFS hdfs = currContext->getHdfsServerConnection(hdfsScanTdb().hostName_,hdfsScanTdb().port_);
    -   hdfsFileInfo *dirInfo = NULL;
    -   Int32 hdfsErrorDetail = 0;//this is errno returned form underlying hdfsOpenFile call.
    +  ContextCli *currContext = getGlobals()->castToExExeStmtGlobals()->getCliGlobals()->currContext();
    +  hdfsFS hdfs = currContext->getHdfsServerConnection(hdfsScanTdb().hostName_,hdfsScanTdb().port_);
    --- End diff --
    
    I could not find where port_ member of the tdb is set. Can you please explain?


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by sureshsubbiah <gi...@git.apache.org>.
Github user sureshsubbiah commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r164280041
  
    --- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
    @@ -0,0 +1,319 @@
    +// @@@ START COPYRIGHT @@@
    +//
    +// Licensed to the Apache Software Foundation (ASF) under one
    +// or more contributor license agreements.  See the NOTICE file
    +// distributed with this work for additional information
    +// regarding copyright ownership.  The ASF licenses this file
    +// to you under the Apache License, Version 2.0 (the
    +// "License"); you may not use this file except in compliance
    +// with the License.  You may obtain a copy of the License at
    +//
    +//   http://www.apache.org/licenses/LICENSE-2.0
    +//
    +// Unless required by applicable law or agreed to in writing,
    +// software distributed under the License is distributed on an
    +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +// KIND, either express or implied.  See the License for the
    +// specific language governing permissions and limitations
    +// under the License.
    +//
    +// @@@ END COPYRIGHT @@@
    +
    +package org.trafodion.sql;
    +
    +import org.apache.log4j.PropertyConfigurator;
    +import org.apache.log4j.Logger;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.FileUtil;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.conf.Configuration;
    +import java.nio.ByteBuffer;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.hadoop.io.compress.CodecPool;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.Compressor;
    +import org.apache.hadoop.io.compress.GzipCodec;
    +import org.apache.hadoop.io.SequenceFile.CompressionType;
    +import org.apache.hadoop.util.ReflectionUtils;
    +
    +public class HDFSClient 
    +{
    +   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
    +   private static Configuration config_ = null;
    +   private static ExecutorService executorService_ = null;
    +   private static FileSystem defaultFs_ = null;
    +   private FileSystem fs_ = null;
    +   private int bufNo_;
    +   private FSDataInputStream fsdis_; 
    +   private OutputStream outStream_;
    +   private String filename_;
    +   private ByteBuffer buf_;
    +   private int bufLen_;
    +   private int bufOffset_ = 0;
    +   private long pos_ = 0;
    +   private int len_ = 0;
    +   private int lenRemain_ = 0; 
    +   private int blockSize_; 
    +   private int bytesRead_;
    +   private Future future_ = null;
    +    
    +   static {
    +      String confFile = System.getProperty("trafodion.log4j.configFile");
    +      System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
    +      if (confFile == null) {
    +         confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
    +      }
    +      PropertyConfigurator.configure(confFile);
    +      config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
    +      executorService_ = Executors.newCachedThreadPool();
    +      try {
    +         defaultFs_ = FileSystem.get(config_);
    +      }
    +      catch (IOException ioe) {
    +         throw new RuntimeException("Exception in HDFSClient static block", ioe);
    +      }
    +   }
    +
    +   class HDFSRead implements Callable 
    +   {
    +      int length_;
    +
    +      HDFSRead(int length) 
    +      {
    +         length_ = length;
    +      }
    + 
    +      public Object call() throws IOException 
    +      {
    +         int bytesRead;
    +         if (buf_.hasArray())
    +            bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, length_);
    +         else
    +         {
    +            buf_.limit(bufOffset_ + length_);
    +            bytesRead = fsdis_.read(buf_);
    +         }
    +         return new Integer(bytesRead);
    +      }
    +   }
    +       
    +   public HDFSClient() 
    +   {
    +   }
    + 
    +   public HDFSClient(int bufNo, String filename, ByteBuffer buffer, long position, int length) throws IOException
    +   {
    +      bufNo_ = bufNo; 
    +      filename_ = filename;
    +      Path filepath = new Path(filename_);
    +      fs_ = FileSystem.get(filepath.toUri(),config_);
    +      fsdis_ = fs_.open(filepath);
    +      blockSize_ = (int)fs_.getDefaultBlockSize(filepath);
    +      buf_  = buffer;
    +      bufOffset_ = 0;
    +      pos_ = position;
    +      len_ = length;
    +      if (buffer.hasArray()) 
    +         bufLen_ = buffer.array().length;
    +      else
    +      {
    +         bufLen_ = buffer.capacity();
    +         buf_.position(0);
    +      }
    +      lenRemain_ = (len_ > bufLen_) ? bufLen_ : len_;
    +      if (lenRemain_ != 0)
    +      {
    +         int readLength = (lenRemain_ > blockSize_) ? blockSize_ : lenRemain_;
    +         future_ = executorService_.submit(new HDFSRead(readLength));
    +      }
    +   }
    +
    +   public int trafHdfsRead() throws IOException, InterruptedException, ExecutionException
    +   {
    +      Integer retObject = 0;
    +      int bytesRead;
    +      int readLength;
    +       
    +      if (lenRemain_ == 0)
    +         return 0;
    +      retObject = (Integer)future_.get();
    +      bytesRead = retObject.intValue();
    +      if (bytesRead == -1)
    +         return -1;
    +      bufOffset_ += bytesRead;
    +      pos_ += bytesRead;
    +      lenRemain_ -= bytesRead;
    +      if (bufOffset_ == bufLen_)
    +         return bytesRead; 
    +      else if (bufOffset_ > bufLen_)
    +         throw new IOException("Internal Error in trafHdfsRead ");
    +      if (lenRemain_ == 0)
    +         return bytesRead; 
    +      readLength = (lenRemain_ > blockSize_) ? blockSize_ : lenRemain_;
    +      future_ = executorService_.submit(new HDFSRead(readLength));
    +      return bytesRead;
    +   } 
    +
    +   public int trafHdfsReadBuffer() throws IOException, InterruptedException, ExecutionException
    +   {
    +      int bytesRead;
    +      int totalBytesRead = 0;
    +      while (true) {
    +         bytesRead = trafHdfsRead();
    +         if (bytesRead == -1 || bytesRead == 0)
    +            return totalBytesRead;
    +         totalBytesRead += bytesRead;
    +         if (totalBytesRead == bufLen_)
    +              return totalBytesRead;
    +      }  
    +   } 
    +
    +   boolean hdfsCreate(String fname , boolean compress) throws IOException
    +   {
    +     if (logger_.isDebugEnabled()) 
    +        logger_.debug("HDFSClient.hdfsCreate() - started" );
    +      Path filePath = null;
    +      if (!compress || (compress && fname.endsWith(".gz")))
    +        filePath = new Path(fname);
    +      else
    +        filePath = new Path(fname + ".gz");
    +        
    +      FileSystem fs = FileSystem.get(filePath.toUri(),config_);
    +      FSDataOutputStream fsOut = fs.create(filePath, true);
    +      
    +      if (compress) {
    +        GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( GzipCodec.class, config_);
    +        Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec);
    +        outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor);
    +      }
    +      else
    +        outStream_ = fsOut;      
    +      if (logger_.isDebugEnabled()) 
    +         logger_.debug("HDFSClient.hdfsCreate() - compressed output stream created" );
    +      return true;
    +    }
    +    
    +    boolean hdfsWrite(byte[] buff, long len) throws IOException
    +    {
    +
    +      if (logger_.isDebugEnabled()) 
    +         logger_.debug("HDFSClient.hdfsWrite() - started" );
    +      outStream_.write(buff);
    +      outStream_.flush();
    +      if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsWrite() - bytes written and flushed:" + len  );
    +      return true;
    +    }
    +    
    +    boolean hdfsClose() throws IOException
    +    {
    +      if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsClose() - started" );
    +      if (outStream_ != null) {
    +          outStream_.close();
    +          outStream_ = null;
    +      }
    +      return true;
    +    }
    +
    +    
    +    public boolean hdfsMergeFiles(String srcPathStr, String dstPathStr) throws IOException
    +    {
    +      if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - start");
    +      if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - source Path: " + srcPathStr + 
    +                                               ", destination File:" + dstPathStr );
    +        Path srcPath = new Path(srcPathStr );
    +        srcPath = srcPath.makeQualified(srcPath.toUri(), null);
    +        FileSystem srcFs = FileSystem.get(srcPath.toUri(),config_);
    +  
    +        Path dstPath = new Path(dstPathStr);
    +        dstPath = dstPath.makeQualified(dstPath.toUri(), null);
    +        FileSystem dstFs = FileSystem.get(dstPath.toUri(),config_);
    +        
    +        if (dstFs.exists(dstPath))
    +        {
    +          if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - destination files exists" );
    +          // for this prototype we just delete the file-- will change in next code drops
    +          dstFs.delete(dstPath, false);
    --- End diff --
    
    Maybe this, and the following two lines of comments should just be removed? Are we supposed to silently delete existing file and overwrite with new merged file?


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by sureshsubbiah <gi...@git.apache.org>.
Github user sureshsubbiah commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r164276621
  
    --- Diff: core/sql/executor/ExHdfsScan.cpp ---
    @@ -1948,6 +1948,54 @@ short ExHdfsScanTcb::handleDone(ExWorkProcRetcode &rc)
       return 0;
     }
     
    +void ExHdfsScanTcb::handleException(NAHeap *heap,
    --- End diff --
    
    Makes me wish we had a common base class for HBaseAccess, HdfsClient and maybe SequenceFileRead/Write. Any think that requiresJava/JNI to go to another file format. Could some of this logic be shared in that common base class? This is not a request for change, simply a question that will help me understand future refactoring choices.


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by sureshsubbiah <gi...@git.apache.org>.
Github user sureshsubbiah commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r164279712
  
    --- Diff: core/sql/executor/HdfsClient_JNI.cpp ---
    @@ -0,0 +1,452 @@
    +//**********************************************************************
    +// @@@ START COPYRIGHT @@@
    +//
    +// Licensed to the Apache Software Foundation (ASF) under one
    +// or more contributor license agreements.  See the NOTICE file
    +// distributed with this work for additional information
    +// regarding copyright ownership.  The ASF licenses this file
    +// to you under the Apache License, Version 2.0 (the
    +// "License"); you may not use this file except in compliance
    +// with the License.  You may obtain a copy of the License at
    +//
    +//   http://www.apache.org/licenses/LICENSE-2.0
    +//
    +// Unless required by applicable law or agreed to in writing,
    +// software distributed under the License is distributed on an
    +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +// KIND, either express or implied.  See the License for the
    +// specific language governing permissions and limitations
    +// under the License.
    +//
    +// @@@ END COPYRIGHT @@@
    +// **********************************************************************
    +
    +#include "QRLogger.h"
    +#include "Globals.h"
    +#include "jni.h"
    +#include "HdfsClient_JNI.h"
    +
    +// ===========================================================================
    +// ===== Class HdfsScan
    +// ===========================================================================
    +
    +JavaMethodInit* HdfsScan::JavaMethods_ = NULL;
    +jclass HdfsScan::javaClass_ = 0;
    +bool HdfsScan::javaMethodsInitialized_ = false;
    +pthread_mutex_t HdfsScan::javaMethodsInitMutex_ = PTHREAD_MUTEX_INITIALIZER;
    +
    +static const char* const hdfsScanErrorEnumStr[] = 
    +{
    +};
    +
    + 
    +//////////////////////////////////////////////////////////////////////////////
    +// 
    +//////////////////////////////////////////////////////////////////////////////
    +HDFS_Scan_RetCode HdfsScan::init()
    +{
    +  static char className[]="org/trafodion/sql/HdfsScan";
    +  HDFS_Scan_RetCode rc; 
    +
    +  if (javaMethodsInitialized_)
    +    return (HDFS_Scan_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_); 
    +  else
    +  {
    +    pthread_mutex_lock(&javaMethodsInitMutex_);
    +    if (javaMethodsInitialized_)
    +    {
    +      pthread_mutex_unlock(&javaMethodsInitMutex_);
    +      return (HDFS_Scan_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
    +    }
    +    JavaMethods_ = new JavaMethodInit[JM_LAST];
    +    
    +    JavaMethods_[JM_CTOR      ].jm_name      = "<init>";
    +    JavaMethods_[JM_CTOR      ].jm_signature = "()V";
    +    JavaMethods_[JM_INIT_SCAN_RANGES].jm_name      = "<init>";
    +    JavaMethods_[JM_INIT_SCAN_RANGES].jm_signature = "(Ljava/lang/Object;Ljava/lang/Object;[Ljava/lang/String;[J[J)V";
    +    JavaMethods_[JM_TRAF_HDFS_READ].jm_name      = "trafHdfsRead";
    +    JavaMethods_[JM_TRAF_HDFS_READ].jm_signature = "()[I";
    +   
    +    rc = (HDFS_Scan_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
    +    javaMethodsInitialized_ = TRUE;
    +    pthread_mutex_unlock(&javaMethodsInitMutex_);
    +  }
    +  return rc;
    +}
    +        
    +char* HdfsScan::getErrorText(HDFS_Scan_RetCode errEnum)
    +{
    +  if (errEnum < (HDFS_Scan_RetCode)JOI_LAST)
    +    return JavaObjectInterface::getErrorText((JOI_RetCode)errEnum);
    +  else
    +    return (char*)hdfsScanErrorEnumStr[errEnum-HDFS_SCAN_FIRST-1];
    --- End diff --
    
    I wonder why there is a "-1" here, but no equivalent "-1"  in HdfsClient::getErrorText()


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by sureshsubbiah <gi...@git.apache.org>.
Github user sureshsubbiah commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167464350
  
    --- Diff: core/sql/generator/GenRelScan.cpp ---
    @@ -1391,6 +1391,9 @@ if (hTabStats->isOrcFile())
       hdfsscan_tdb->setUseCif(useCIF);
       hdfsscan_tdb->setUseCifDefrag(useCIFDegrag);
     
    +  if (CmpCommon::getDefault(USE_LIBHDFS_SCAN) == DF_ON)
    --- End diff --
    
    Should we add a check that says if a table has any file that is compressed then we will not disable libhdfs scan? Or maybe support for compressed files is coming soon and for now we set this cqd OFF only if we thing conditions warrant it?


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by sureshsubbiah <gi...@git.apache.org>.
Github user sureshsubbiah commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r165533175
  
    --- Diff: core/sql/executor/ExExeUtilGet.cpp ---
    @@ -3521,13 +3521,9 @@ ExExeUtilGetHbaseObjectsTcb::ExExeUtilGetHbaseObjectsTcb(
          ex_globals * glob)
          : ExExeUtilGetMetadataInfoTcb( exe_util_tdb, glob)
     {
    -  int jniDebugPort = 0;
    -  int jniDebugTimeout = 0;
       ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(),
     					(char*)exe_util_tdb.server(), 
    -					(char*)exe_util_tdb.zkPort(),
    -                                        jniDebugPort,
    -                                        jniDebugTimeout);
    +					(char*)exe_util_tdb.zkPort());
    --- End diff --
    
    Could you please comment on how we can attach jdb to say sqlci to debug some code in HBaseClient.java or HiveClient.java. I was under the impression that jniDebugPort helped with that. I see the two removed instance members have now been made static class variables of JOI. I could not catch where they were initialized though. Please point me to that code.


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by sureshsubbiah <gi...@git.apache.org>.
Github user sureshsubbiah commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167467102
  
    --- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
    @@ -0,0 +1,319 @@
    +// @@@ START COPYRIGHT @@@
    +//
    +// Licensed to the Apache Software Foundation (ASF) under one
    +// or more contributor license agreements.  See the NOTICE file
    +// distributed with this work for additional information
    +// regarding copyright ownership.  The ASF licenses this file
    +// to you under the Apache License, Version 2.0 (the
    +// "License"); you may not use this file except in compliance
    +// with the License.  You may obtain a copy of the License at
    +//
    +//   http://www.apache.org/licenses/LICENSE-2.0
    +//
    +// Unless required by applicable law or agreed to in writing,
    +// software distributed under the License is distributed on an
    +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +// KIND, either express or implied.  See the License for the
    +// specific language governing permissions and limitations
    +// under the License.
    +//
    +// @@@ END COPYRIGHT @@@
    +
    +package org.trafodion.sql;
    +
    +import org.apache.log4j.PropertyConfigurator;
    +import org.apache.log4j.Logger;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.FileUtil;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.conf.Configuration;
    +import java.nio.ByteBuffer;
    +import java.io.IOException;
    +import java.io.EOFException;
    +import java.io.OutputStream;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.hadoop.io.compress.CodecPool;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.Compressor;
    +import org.apache.hadoop.io.compress.GzipCodec;
    +import org.apache.hadoop.io.SequenceFile.CompressionType;
    +import org.apache.hadoop.util.ReflectionUtils;
    +
    +public class HDFSClient 
    +{
    +   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
    +   private static Configuration config_ = null;
    +   private static ExecutorService executorService_ = null;
    +   private static FileSystem defaultFs_ = null;
    +   private FileSystem fs_ = null;
    +   private int bufNo_;
    +   private int rangeNo_;
    +   private FSDataInputStream fsdis_; 
    +   private OutputStream outStream_;
    +   private String filename_;
    +   private ByteBuffer buf_;
    +   private int bufLen_;
    +   private int bufOffset_ = 0;
    +   private long pos_ = 0;
    +   private int len_ = 0;
    +   private int lenRemain_ = 0; 
    +   private int blockSize_; 
    +   private int bytesRead_;
    +   private Future future_ = null;
    +   private int isEOF_ = 0; 
    +   static {
    +      String confFile = System.getProperty("trafodion.log4j.configFile");
    +      System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
    +      if (confFile == null) {
    +         confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
    +      }
    +      PropertyConfigurator.configure(confFile);
    +      config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
    +      executorService_ = Executors.newCachedThreadPool();
    +      try {
    +         defaultFs_ = FileSystem.get(config_);
    +      }
    +      catch (IOException ioe) {
    +         throw new RuntimeException("Exception in HDFSClient static block", ioe);
    +      }
    +   }
    +
    +   class HDFSRead implements Callable 
    +   {
    +      HDFSRead() 
    +      {
    +      }
    + 
    +      public Object call() throws IOException 
    +      {
    +         int bytesRead;
    +         int totalBytesRead = 0;
    +         if (! buf_.hasArray()) {
    +            try {
    +              fsdis_.seek(pos_);
    --- End diff --
    
    Should we set readahead on this stream?


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by sureshsubbiah <gi...@git.apache.org>.
Github user sureshsubbiah commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r164275585
  
    --- Diff: core/sql/executor/ExHbaseIUD.cpp ---
    @@ -1158,7 +1158,7 @@ ExHbaseAccessBulkLoadPrepSQTcb::ExHbaseAccessBulkLoadPrepSQTcb(
                           "traf_upsert_err",
                           fileNum,
                           loggingFileName_);
    -   LoggingFileCreated_ = FALSE;
    +   loggingFileCreated_ = FALSE;
    --- End diff --
    
    Is this line necessary now? loggingFileCreated_ belongs to the base class now I gather. Its constructor would have done this already.


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by selvaganesang <gi...@git.apache.org>.
Github user selvaganesang commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167474490
  
    --- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
    @@ -0,0 +1,319 @@
    +// @@@ START COPYRIGHT @@@
    +//
    +// Licensed to the Apache Software Foundation (ASF) under one
    +// or more contributor license agreements.  See the NOTICE file
    +// distributed with this work for additional information
    +// regarding copyright ownership.  The ASF licenses this file
    +// to you under the Apache License, Version 2.0 (the
    +// "License"); you may not use this file except in compliance
    +// with the License.  You may obtain a copy of the License at
    +//
    +//   http://www.apache.org/licenses/LICENSE-2.0
    +//
    +// Unless required by applicable law or agreed to in writing,
    +// software distributed under the License is distributed on an
    +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +// KIND, either express or implied.  See the License for the
    +// specific language governing permissions and limitations
    +// under the License.
    +//
    +// @@@ END COPYRIGHT @@@
    +
    +package org.trafodion.sql;
    +
    +import org.apache.log4j.PropertyConfigurator;
    +import org.apache.log4j.Logger;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.FileUtil;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.conf.Configuration;
    +import java.nio.ByteBuffer;
    +import java.io.IOException;
    +import java.io.EOFException;
    +import java.io.OutputStream;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.hadoop.io.compress.CodecPool;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.Compressor;
    +import org.apache.hadoop.io.compress.GzipCodec;
    +import org.apache.hadoop.io.SequenceFile.CompressionType;
    +import org.apache.hadoop.util.ReflectionUtils;
    +
    +public class HDFSClient 
    +{
    +   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
    +   private static Configuration config_ = null;
    +   private static ExecutorService executorService_ = null;
    +   private static FileSystem defaultFs_ = null;
    +   private FileSystem fs_ = null;
    +   private int bufNo_;
    +   private int rangeNo_;
    +   private FSDataInputStream fsdis_; 
    +   private OutputStream outStream_;
    +   private String filename_;
    +   private ByteBuffer buf_;
    +   private int bufLen_;
    +   private int bufOffset_ = 0;
    +   private long pos_ = 0;
    +   private int len_ = 0;
    +   private int lenRemain_ = 0; 
    +   private int blockSize_; 
    +   private int bytesRead_;
    +   private Future future_ = null;
    +   private int isEOF_ = 0; 
    +   static {
    +      String confFile = System.getProperty("trafodion.log4j.configFile");
    +      System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
    +      if (confFile == null) {
    +         confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
    +      }
    +      PropertyConfigurator.configure(confFile);
    +      config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
    +      executorService_ = Executors.newCachedThreadPool();
    +      try {
    +         defaultFs_ = FileSystem.get(config_);
    +      }
    +      catch (IOException ioe) {
    +         throw new RuntimeException("Exception in HDFSClient static block", ioe);
    +      }
    +   }
    +
    +   class HDFSRead implements Callable 
    +   {
    +      HDFSRead() 
    +      {
    +      }
    + 
    +      public Object call() throws IOException 
    +      {
    +         int bytesRead;
    +         int totalBytesRead = 0;
    +         if (! buf_.hasArray()) {
    +            try {
    +              fsdis_.seek(pos_);
    --- End diff --
    
    Didn't understand the comment. We do readAhead in HdfsScan. While one buffer is being processed, the another buffer is filled up with remaining portion in the range or the next range.


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/trafodion/pull/1417


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by selvaganesang <gi...@git.apache.org>.
Github user selvaganesang commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167473572
  
    --- Diff: core/sql/executor/ExHdfsScan.cpp ---
    @@ -514,11 +541,108 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
     
                 if (step_ == CHECK_FOR_DATA_MOD_AND_DONE)
                   step_ = DONE;
    -            else
    -              step_ = INIT_HDFS_CURSOR;
    +            else {
    +              if (useLibhdfsScan_)
    +                 step_ = INIT_HDFS_CURSOR;
    +              else
    +                 step_ = SETUP_HDFS_SCAN;
    +            }
               }        
               break;
    -
    +        case SETUP_HDFS_SCAN:
    +          {   
    +             if (hdfsScan_ != NULL)
    +                NADELETE(hdfsScan_, HdfsScan, getHeap());
    +             hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(), hdfsScanBuf_, hdfsScanBufMaxSize_, 
    +                            &hdfsFileInfoListAsArray_, beginRangeNum_, numRanges_, hdfsScanTdb().rangeTailIOSize_, 
    +                            hdfsStats_, hdfsScanRetCode);
    +             if (hdfsScanRetCode != HDFS_SCAN_OK)
    +             {
    +                setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, "SETUP_HDFS_SCAN", 
    +                              currContext->getJniErrorStr(), NULL);              
    +                step_ = HANDLE_ERROR_AND_DONE;
    +                break;
    +             } 
    +             bufBegin_ = NULL;
    +             bufEnd_ = NULL;
    +             bufLogicalEnd_ = NULL;
    +             headRoomCopied_ = 0;
    +             prevRangeNum_ = -1;                         
    +             currRangeBytesRead_ = 0;                   
    +             recordSkip_ = FALSE;
    +             extraBytesRead_ = 0;
    +             step_ = TRAF_HDFS_READ;
    +          } 
    +          break;
    +        case TRAF_HDFS_READ:
    +          {
    +             hdfsScanRetCode = hdfsScan_->trafHdfsRead((NAHeap *)getHeap(), hdfsStats_, retArray_, sizeof(retArray_)/sizeof(int));
    +             if (hdfsScanRetCode == HDFS_SCAN_EOR) {
    +                step_ = DONE;
    +                break;
    +             }
    +             else if (hdfsScanRetCode != HDFS_SCAN_OK) {
    +                setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, "SETUP_HDFS_SCAN", 
    +                              currContext->getJniErrorStr(), NULL);              
    +                step_ = HANDLE_ERROR_AND_DONE;
    +                break;
    +             } 
    +             hdfo = hdfsFileInfoListAsArray_.at(retArray_[RANGE_NO]);
    +             bufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED];
    +             if (retArray_[RANGE_NO] != prevRangeNum_) {  
    +                currRangeBytesRead_ = retArray_[BYTES_COMPLETED];
    +                bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_;
    +                if (hdfo->getStartOffset() == 0)
    +                   recordSkip_ = FALSE;
    +                else
    +                   recordSkip_ = TRUE; 
    +             } else {
    +                currRangeBytesRead_ += retArray_[BYTES_COMPLETED];
    +                bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ - headRoomCopied_;
    +                recordSkip_ = FALSE;
    +             }
    +             if (currRangeBytesRead_ > hdfo->getBytesToRead())
    +                extraBytesRead_ = currRangeBytesRead_ - hdfo->getBytesToRead(); 
    +             else
    +                extraBytesRead_ = 0;
    +             // headRoom_ is the number of extra bytes read (rangeTailIOSize)
    +             // If EOF is reached while reading the range and the extraBytes read
    +             // is less than headRoom_, then process all the data till EOF 
    +             if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_)
    +                extraBytesRead_ = 0;
    +             bufLogicalEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED] - extraBytesRead_;
    +             prevRangeNum_ = retArray_[RANGE_NO];
    +             headRoomCopied_ = 0;
    +             if (recordSkip_) {
    +		hdfsBufNextRow_ = hdfs_strchr((char *)bufBegin_,
    +			      hdfsScanTdb().recordDelimiter_, 
    +                              (char *)bufEnd_,
    +			      checkRangeDelimiter_, 
    +			      hdfsScanTdb().getHiveScanMode(), &changedLen);
    +                if (hdfsBufNextRow_ == NULL) {
    --- End diff --
    
    Yes


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by sureshsubbiah <gi...@git.apache.org>.
Github user sureshsubbiah commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167467533
  
    --- Diff: core/sql/src/main/java/org/trafodion/sql/HdfsScan.java ---
    @@ -0,0 +1,289 @@
    +// @@@ START COPYRIGHT @@@
    +//
    +// Licensed to the Apache Software Foundation (ASF) under one
    +// or more contributor license agreements.  See the NOTICE file
    +// distributed with this work for additional information
    +// regarding copyright ownership.  The ASF licenses this file
    +// to you under the Apache License, Version 2.0 (the
    +// "License"); you may not use this file except in compliance
    +// with the License.  You may obtain a copy of the License at
    +//
    +//   http://www.apache.org/licenses/LICENSE-2.0
    +//
    +// Unless required by applicable law or agreed to in writing,
    +// software distributed under the License is distributed on an
    +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +// KIND, either express or implied.  See the License for the
    +// specific language governing permissions and limitations
    +// under the License.
    +//
    +// @@@ END COPYRIGHT @@@
    +
    +package org.trafodion.sql;
    +
    +// This class implements an efficient mechanism to read hdfs files
    +// Trafodion ExHdfsScan operator provides a range of scans to be performed.
    +// The range consists of a hdfs filename, offset and length to be read
    +// This class takes in two ByteBuffers. These ByteBuffer can be either direct buffers
    +// backed up native buffers or indirect buffer backed by java arrays.
    +// All the ranges are read alternating between the two buffers using ExecutorService
    +// using CachedThreadPool mechanism. 
    +// For a given HdfsScan instance, only one thread(IO thread) is scheduled to read
    +// the next full or partial buffer while the main thread processes the previously
    +// read information from the other buffer
    +
    +import org.apache.log4j.PropertyConfigurator;
    +import org.apache.log4j.Logger;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.conf.Configuration;
    +import java.nio.ByteBuffer;
    +import java.io.IOException;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.ExecutionException;
    +import org.trafodion.sql.HDFSClient;
    +
    +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.metastore.api.Table;
    +import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
    +import org.apache.hadoop.fs.FileStatus;
    +import java.net.URI;
    +
    +public class HdfsScan 
    +{
    +   static Logger logger_ = Logger.getLogger(HdfsScan.class.getName());
    +   private ByteBuffer buf_[];
    +   private int bufLen_[];
    +   private HDFSClient hdfsClient_[];
    +   private int currRange_;
    +   private long currPos_;
    +   private long lenRemain_;
    +   private int lastBufCompleted_ = -1;
    +   private boolean scanCompleted_;
    +   
    +   class HdfsScanRange 
    +   {
    +      String filename_;
    +      long pos_;
    +      long len_;
    +      int tdbRangeNum_;
    +      
    +      HdfsScanRange(String filename, long pos, long len, int tdbRangeNum)
    +      {
    +         filename_ = filename;
    +         pos_ = pos;
    +         len_ = len;
    +         tdbRangeNum_ = tdbRangeNum;
    +      }
    +   }
    +   
    +   private HdfsScanRange hdfsScanRanges_[];
    +    
    +   static {
    +      String confFile = System.getProperty("trafodion.log4j.configFile");
    +      System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
    +   }
    +
    +   public HdfsScan() 
    +   {
    +   }
    +
    +   public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, String filename[], long pos[], long len[], int rangeNum[]) throws IOException
    +   {
    +      buf_ = new ByteBuffer[2];
    +      bufLen_ = new int[2];
    +
    +      buf_[0] = buf1;
    +      buf_[1] = buf2;
    +
    +      for (int i = 0; i < 2 ; i++) {
    +          if (buf_[i].hasArray())
    +             bufLen_[i] = buf_[i].array().length;
    +          else
    +             bufLen_[i] = buf_[i].capacity();
    +      }
    +      hdfsClient_ = new HDFSClient[2];
    +      hdfsScanRanges_ = new HdfsScanRange[filename.length]; 
    +      for (int i = 0; i < filename.length; i++) {
    +         hdfsScanRanges_[i] = new HdfsScanRange(filename[i], pos[i], len[i], rangeNum[i]);
    --- End diff --
    
    An interesting will be to create ranges, (i.e. files) with 0 bytes. We know that sqoop sometimes creates such files.


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by selvaganesang <gi...@git.apache.org>.
Github user selvaganesang commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r164237633
  
    --- Diff: core/sql/executor/ExFastTransport.h ---
    @@ -407,6 +408,7 @@ class ExHdfsFastExtractTcb : public ExFastExtractTcb
                               
       NABoolean isSequenceFile();
       void createSequenceFileError(Int32 sfwRetCode);
    +  void createHdfsClientFileError(Int32 sfwRetCode);
    --- End diff --
    
    Will do in my next commit though it is a just parameter name in the method declaration.


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by selvaganesang <gi...@git.apache.org>.
Github user selvaganesang commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r165170894
  
    --- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
    @@ -0,0 +1,319 @@
    +// @@@ START COPYRIGHT @@@
    +//
    +// Licensed to the Apache Software Foundation (ASF) under one
    +// or more contributor license agreements.  See the NOTICE file
    +// distributed with this work for additional information
    +// regarding copyright ownership.  The ASF licenses this file
    +// to you under the Apache License, Version 2.0 (the
    +// "License"); you may not use this file except in compliance
    +// with the License.  You may obtain a copy of the License at
    +//
    +//   http://www.apache.org/licenses/LICENSE-2.0
    +//
    +// Unless required by applicable law or agreed to in writing,
    +// software distributed under the License is distributed on an
    +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +// KIND, either express or implied.  See the License for the
    +// specific language governing permissions and limitations
    +// under the License.
    +//
    +// @@@ END COPYRIGHT @@@
    +
    +package org.trafodion.sql;
    +
    +import org.apache.log4j.PropertyConfigurator;
    +import org.apache.log4j.Logger;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.FileUtil;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.conf.Configuration;
    +import java.nio.ByteBuffer;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.hadoop.io.compress.CodecPool;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.Compressor;
    +import org.apache.hadoop.io.compress.GzipCodec;
    +import org.apache.hadoop.io.SequenceFile.CompressionType;
    +import org.apache.hadoop.util.ReflectionUtils;
    +
    +public class HDFSClient 
    +{
    +   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
    +   private static Configuration config_ = null;
    +   private static ExecutorService executorService_ = null;
    +   private static FileSystem defaultFs_ = null;
    +   private FileSystem fs_ = null;
    +   private int bufNo_;
    +   private FSDataInputStream fsdis_; 
    +   private OutputStream outStream_;
    +   private String filename_;
    +   private ByteBuffer buf_;
    +   private int bufLen_;
    +   private int bufOffset_ = 0;
    +   private long pos_ = 0;
    +   private int len_ = 0;
    +   private int lenRemain_ = 0; 
    +   private int blockSize_; 
    +   private int bytesRead_;
    +   private Future future_ = null;
    +    
    +   static {
    +      String confFile = System.getProperty("trafodion.log4j.configFile");
    +      System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
    +      if (confFile == null) {
    +         confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
    +      }
    +      PropertyConfigurator.configure(confFile);
    +      config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
    +      executorService_ = Executors.newCachedThreadPool();
    +      try {
    +         defaultFs_ = FileSystem.get(config_);
    +      }
    +      catch (IOException ioe) {
    +         throw new RuntimeException("Exception in HDFSClient static block", ioe);
    +      }
    +   }
    +
    +   class HDFSRead implements Callable 
    --- End diff --
    
    HdfsClient is meant for both read and write to hdfs text files. Need to check if we can use for other format files too. HdfsClient is also used to create Hdfs files..
    
    I have made 2nd commit that might help to understand it better with some comments in ExHdfsScan.h


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by sureshsubbiah <gi...@git.apache.org>.
Github user sureshsubbiah commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r164276782
  
    --- Diff: core/sql/executor/HdfsClient_JNI.cpp ---
    @@ -0,0 +1,452 @@
    +//**********************************************************************
    +// @@@ START COPYRIGHT @@@
    +//
    +// Licensed to the Apache Software Foundation (ASF) under one
    +// or more contributor license agreements.  See the NOTICE file
    +// distributed with this work for additional information
    +// regarding copyright ownership.  The ASF licenses this file
    +// to you under the Apache License, Version 2.0 (the
    +// "License"); you may not use this file except in compliance
    +// with the License.  You may obtain a copy of the License at
    +//
    +//   http://www.apache.org/licenses/LICENSE-2.0
    +//
    +// Unless required by applicable law or agreed to in writing,
    +// software distributed under the License is distributed on an
    +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +// KIND, either express or implied.  See the License for the
    +// specific language governing permissions and limitations
    +// under the License.
    +//
    +// @@@ END COPYRIGHT @@@
    +// **********************************************************************
    +
    +#include "QRLogger.h"
    +#include "Globals.h"
    +#include "jni.h"
    +#include "HdfsClient_JNI.h"
    +
    +// ===========================================================================
    +// ===== Class HdfsScan
    +// ===========================================================================
    +
    +JavaMethodInit* HdfsScan::JavaMethods_ = NULL;
    +jclass HdfsScan::javaClass_ = 0;
    +bool HdfsScan::javaMethodsInitialized_ = false;
    +pthread_mutex_t HdfsScan::javaMethodsInitMutex_ = PTHREAD_MUTEX_INITIALIZER;
    +
    +static const char* const hdfsScanErrorEnumStr[] = 
    +{
    --- End diff --
    
    I am surprised that this is empty. Is that because HdfsScan java side is now in preview and error handling has not been introduced yet?


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by selvaganesang <gi...@git.apache.org>.
Github user selvaganesang commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167474575
  
    --- Diff: core/sql/src/main/java/org/trafodion/sql/HdfsScan.java ---
    @@ -0,0 +1,289 @@
    +// @@@ START COPYRIGHT @@@
    +//
    +// Licensed to the Apache Software Foundation (ASF) under one
    +// or more contributor license agreements.  See the NOTICE file
    +// distributed with this work for additional information
    +// regarding copyright ownership.  The ASF licenses this file
    +// to you under the Apache License, Version 2.0 (the
    +// "License"); you may not use this file except in compliance
    +// with the License.  You may obtain a copy of the License at
    +//
    +//   http://www.apache.org/licenses/LICENSE-2.0
    +//
    +// Unless required by applicable law or agreed to in writing,
    +// software distributed under the License is distributed on an
    +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +// KIND, either express or implied.  See the License for the
    +// specific language governing permissions and limitations
    +// under the License.
    +//
    +// @@@ END COPYRIGHT @@@
    +
    +package org.trafodion.sql;
    +
    +// This class implements an efficient mechanism to read hdfs files
    +// Trafodion ExHdfsScan operator provides a range of scans to be performed.
    +// The range consists of a hdfs filename, offset and length to be read
    +// This class takes in two ByteBuffers. These ByteBuffer can be either direct buffers
    +// backed up native buffers or indirect buffer backed by java arrays.
    +// All the ranges are read alternating between the two buffers using ExecutorService
    +// using CachedThreadPool mechanism. 
    +// For a given HdfsScan instance, only one thread(IO thread) is scheduled to read
    +// the next full or partial buffer while the main thread processes the previously
    +// read information from the other buffer
    +
    +import org.apache.log4j.PropertyConfigurator;
    +import org.apache.log4j.Logger;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.conf.Configuration;
    +import java.nio.ByteBuffer;
    +import java.io.IOException;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.ExecutionException;
    +import org.trafodion.sql.HDFSClient;
    +
    +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.metastore.api.Table;
    +import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
    +import org.apache.hadoop.fs.FileStatus;
    +import java.net.URI;
    +
    +public class HdfsScan 
    +{
    +   static Logger logger_ = Logger.getLogger(HdfsScan.class.getName());
    +   private ByteBuffer buf_[];
    +   private int bufLen_[];
    +   private HDFSClient hdfsClient_[];
    +   private int currRange_;
    +   private long currPos_;
    +   private long lenRemain_;
    +   private int lastBufCompleted_ = -1;
    +   private boolean scanCompleted_;
    +   
    +   class HdfsScanRange 
    +   {
    +      String filename_;
    +      long pos_;
    +      long len_;
    +      int tdbRangeNum_;
    +      
    +      HdfsScanRange(String filename, long pos, long len, int tdbRangeNum)
    +      {
    +         filename_ = filename;
    +         pos_ = pos;
    +         len_ = len;
    +         tdbRangeNum_ = tdbRangeNum;
    +      }
    +   }
    +   
    +   private HdfsScanRange hdfsScanRanges_[];
    +    
    +   static {
    +      String confFile = System.getProperty("trafodion.log4j.configFile");
    +      System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
    +   }
    +
    +   public HdfsScan() 
    +   {
    +   }
    +
    +   public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, String filename[], long pos[], long len[], int rangeNum[]) throws IOException
    +   {
    +      buf_ = new ByteBuffer[2];
    +      bufLen_ = new int[2];
    +
    +      buf_[0] = buf1;
    +      buf_[1] = buf2;
    +
    +      for (int i = 0; i < 2 ; i++) {
    +          if (buf_[i].hasArray())
    +             bufLen_[i] = buf_[i].array().length;
    +          else
    +             bufLen_[i] = buf_[i].capacity();
    +      }
    +      hdfsClient_ = new HDFSClient[2];
    +      hdfsScanRanges_ = new HdfsScanRange[filename.length]; 
    +      for (int i = 0; i < filename.length; i++) {
    +         hdfsScanRanges_[i] = new HdfsScanRange(filename[i], pos[i], len[i], rangeNum[i]);
    --- End diff --
    
    I think it should work. Let me confirm it


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by eowhadi <gi...@git.apache.org>.
Github user eowhadi commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r164186395
  
    --- Diff: core/sql/executor/ExFastTransport.h ---
    @@ -407,6 +408,7 @@ class ExHdfsFastExtractTcb : public ExFastExtractTcb
                               
       NABoolean isSequenceFile();
       void createSequenceFileError(Int32 sfwRetCode);
    +  void createHdfsClientFileError(Int32 sfwRetCode);
    --- End diff --
    
    should be hdfsClientRetCode instead of sfwRetCode



---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by selvaganesang <gi...@git.apache.org>.
Github user selvaganesang commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167473694
  
    --- Diff: core/sql/executor/ExHdfsScan.cpp ---
    @@ -1149,8 +1297,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
     	        if (hdfsScanTdb().continueOnError())
     	        {
     	          if ((pentry_down->downState.request == ex_queue::GET_N) &&
    -	              (pentry_down->downState.requestValue == matches_))
    -	            step_ = CLOSE_FILE;
    +	              (pentry_down->downState.requestValue == matches_)) {
    +                     if (useLibhdfsScan_)
    +                        step_ = CLOSE_HDFS_CURSOR;
    --- End diff --
    
    No Thanks for catching it.. Will change it back to CLOSE_HDFS_CURSOR


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by sureshsubbiah <gi...@git.apache.org>.
Github user sureshsubbiah commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r164276571
  
    --- Diff: core/sql/executor/ExHdfsScan.cpp ---
    @@ -283,6 +285,8 @@ void ExHdfsScanTcb::freeResources()
          ExpLOBinterfaceCleanup(lobGlob_, (NAHeap *)getGlobals()->getDefaultHeap());
          lobGlob_ = NULL;
       }
    +  if (hdfsClient_ != NULL) 
    +     NADELETE(hdfsClient_, HdfsClient, getHeap());
    --- End diff --
    
    Same comment as Eric on ExHBaseAccess::freeResources(). Should we release loggingFileName_ here? Constructor guarantees it is never null.


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by sureshsubbiah <gi...@git.apache.org>.
Github user sureshsubbiah commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167457124
  
    --- Diff: core/sql/executor/ExHdfsScan.cpp ---
    @@ -1149,8 +1297,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
     	        if (hdfsScanTdb().continueOnError())
     	        {
     	          if ((pentry_down->downState.request == ex_queue::GET_N) &&
    -	              (pentry_down->downState.requestValue == matches_))
    -	            step_ = CLOSE_FILE;
    +	              (pentry_down->downState.requestValue == matches_)) {
    +                     if (useLibhdfsScan_)
    +                        step_ = CLOSE_HDFS_CURSOR;
    --- End diff --
    
    Is this intentional, to change from CLOSE_FILE to CLOSE_HDFS_CURSOR?


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by selvaganesang <gi...@git.apache.org>.
Github user selvaganesang commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167601286
  
    --- Diff: core/sql/generator/GenRelScan.cpp ---
    @@ -1391,6 +1391,9 @@ if (hTabStats->isOrcFile())
       hdfsscan_tdb->setUseCif(useCIF);
       hdfsscan_tdb->setUseCifDefrag(useCIFDegrag);
     
    +  if (CmpCommon::getDefault(USE_LIBHDFS_SCAN) == DF_ON)
    --- End diff --
    
    I need to confirm if compression is already supported.  Are there any tests for compression in hive regressions already? If so, it is handled already.


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by selvaganesang <gi...@git.apache.org>.
Github user selvaganesang commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r164278919
  
    --- Diff: core/sql/executor/ExHbaseIUD.cpp ---
    @@ -1158,7 +1158,7 @@ ExHbaseAccessBulkLoadPrepSQTcb::ExHbaseAccessBulkLoadPrepSQTcb(
                           "traf_upsert_err",
                           fileNum,
                           loggingFileName_);
    -   LoggingFileCreated_ = FALSE;
    +   loggingFileCreated_ = FALSE;
    --- End diff --
    
    Yes. I will confirm and remove it


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by sureshsubbiah <gi...@git.apache.org>.
Github user sureshsubbiah commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r164280205
  
    --- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
    @@ -0,0 +1,319 @@
    +// @@@ START COPYRIGHT @@@
    +//
    +// Licensed to the Apache Software Foundation (ASF) under one
    +// or more contributor license agreements.  See the NOTICE file
    +// distributed with this work for additional information
    +// regarding copyright ownership.  The ASF licenses this file
    +// to you under the Apache License, Version 2.0 (the
    +// "License"); you may not use this file except in compliance
    +// with the License.  You may obtain a copy of the License at
    +//
    +//   http://www.apache.org/licenses/LICENSE-2.0
    +//
    +// Unless required by applicable law or agreed to in writing,
    +// software distributed under the License is distributed on an
    +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +// KIND, either express or implied.  See the License for the
    +// specific language governing permissions and limitations
    +// under the License.
    +//
    +// @@@ END COPYRIGHT @@@
    +
    +package org.trafodion.sql;
    +
    +import org.apache.log4j.PropertyConfigurator;
    +import org.apache.log4j.Logger;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.FileUtil;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.conf.Configuration;
    +import java.nio.ByteBuffer;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.hadoop.io.compress.CodecPool;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.Compressor;
    +import org.apache.hadoop.io.compress.GzipCodec;
    +import org.apache.hadoop.io.SequenceFile.CompressionType;
    +import org.apache.hadoop.util.ReflectionUtils;
    +
    +public class HDFSClient 
    +{
    +   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
    +   private static Configuration config_ = null;
    +   private static ExecutorService executorService_ = null;
    +   private static FileSystem defaultFs_ = null;
    +   private FileSystem fs_ = null;
    +   private int bufNo_;
    +   private FSDataInputStream fsdis_; 
    +   private OutputStream outStream_;
    +   private String filename_;
    +   private ByteBuffer buf_;
    +   private int bufLen_;
    +   private int bufOffset_ = 0;
    +   private long pos_ = 0;
    +   private int len_ = 0;
    +   private int lenRemain_ = 0; 
    +   private int blockSize_; 
    +   private int bytesRead_;
    +   private Future future_ = null;
    +    
    +   static {
    +      String confFile = System.getProperty("trafodion.log4j.configFile");
    +      System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
    +      if (confFile == null) {
    +         confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
    +      }
    +      PropertyConfigurator.configure(confFile);
    +      config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
    +      executorService_ = Executors.newCachedThreadPool();
    +      try {
    +         defaultFs_ = FileSystem.get(config_);
    +      }
    +      catch (IOException ioe) {
    +         throw new RuntimeException("Exception in HDFSClient static block", ioe);
    +      }
    +   }
    +
    +   class HDFSRead implements Callable 
    --- End diff --
    
    Could you please explain how the classes HdfsClient, HdfsClient.HDFSRead and HdfsScan are related? Thank you for the nice comments in HdfsScan.java. I took HdfsClient to be the class that contains all the methods that we removed from SequenceFileWriter.  If that is true, why do we need a HDFSRead subclass? Is this for the future or is there some functionality that I missed. For error row logging do we need read?  


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by sureshsubbiah <gi...@git.apache.org>.
Github user sureshsubbiah commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167412178
  
    --- Diff: core/sql/executor/ExHdfsScan.cpp ---
    @@ -118,15 +119,39 @@ ExHdfsScanTcb::ExHdfsScanTcb(
       , dataModCheckDone_(FALSE)
       , loggingErrorDiags_(NULL)
       , loggingFileName_(NULL)
    +  , hdfsClient_(NULL)
    +  , hdfsScan_(NULL)
    +  , hdfsStats_(NULL)
       , hdfsFileInfoListAsArray_(glob->getDefaultHeap(), hdfsScanTdb.getHdfsFileInfoList()->numEntries())
     {
       Space * space = (glob ? glob->getSpace() : 0);
       CollHeap * heap = (glob ? glob->getDefaultHeap() : 0);
    +  useLibhdfsScan_ = hdfsScanTdb.getUseLibhdfsScan();
    +  if (isSequenceFile())
    +     useLibhdfsScan_ = TRUE;
       lobGlob_ = NULL;
    -  const int readBufSize =  (Int32)hdfsScanTdb.hdfsBufSize_;
    -  hdfsScanBuffer_ = new(space) char[ readBufSize + 1 ]; 
    -  hdfsScanBuffer_[readBufSize] = '\0';
    -
    +  hdfsScanBufMaxSize_ = hdfsScanTdb.hdfsBufSize_;
    +  headRoom_ = (Int32)hdfsScanTdb.rangeTailIOSize_;
    +
    +  if (useLibhdfsScan_) {
    +     hdfsScanBuffer_ = new(heap) char[ hdfsScanBufMaxSize_ + 1 ]; 
    +     hdfsScanBuffer_[hdfsScanBufMaxSize_] = '\0';
    +  } else {
    +     hdfsScanBufBacking_[0] = new (heap) BYTE[hdfsScanBufMaxSize_ + 2 * (headRoom_)];
    --- End diff --
    
    Could we please test this logic with extremely wide rows. Currently we have a limitation that the maximum row size cannot be larger than hdfsScanBufMaxSize_. It will be a good test to have 10 rows of this size, say in 2 or more files and check if we can process it. Logic seems good, this is a test suggestion. As you know scanBufNaxSize can re reduced with a cqd, to avoid having to deal with rows that are several MBs wide.


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by sureshsubbiah <gi...@git.apache.org>.
Github user sureshsubbiah commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167466431
  
    --- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
    @@ -0,0 +1,319 @@
    +// @@@ START COPYRIGHT @@@
    +//
    +// Licensed to the Apache Software Foundation (ASF) under one
    +// or more contributor license agreements.  See the NOTICE file
    +// distributed with this work for additional information
    +// regarding copyright ownership.  The ASF licenses this file
    +// to you under the Apache License, Version 2.0 (the
    +// "License"); you may not use this file except in compliance
    +// with the License.  You may obtain a copy of the License at
    +//
    +//   http://www.apache.org/licenses/LICENSE-2.0
    +//
    +// Unless required by applicable law or agreed to in writing,
    +// software distributed under the License is distributed on an
    +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +// KIND, either express or implied.  See the License for the
    +// specific language governing permissions and limitations
    +// under the License.
    +//
    +// @@@ END COPYRIGHT @@@
    +
    +package org.trafodion.sql;
    +
    +import org.apache.log4j.PropertyConfigurator;
    +import org.apache.log4j.Logger;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.FileUtil;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.conf.Configuration;
    +import java.nio.ByteBuffer;
    +import java.io.IOException;
    +import java.io.EOFException;
    +import java.io.OutputStream;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.hadoop.io.compress.CodecPool;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.Compressor;
    +import org.apache.hadoop.io.compress.GzipCodec;
    +import org.apache.hadoop.io.SequenceFile.CompressionType;
    +import org.apache.hadoop.util.ReflectionUtils;
    +
    +public class HDFSClient 
    +{
    +   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
    +   private static Configuration config_ = null;
    +   private static ExecutorService executorService_ = null;
    +   private static FileSystem defaultFs_ = null;
    +   private FileSystem fs_ = null;
    +   private int bufNo_;
    +   private int rangeNo_;
    +   private FSDataInputStream fsdis_; 
    +   private OutputStream outStream_;
    +   private String filename_;
    +   private ByteBuffer buf_;
    +   private int bufLen_;
    +   private int bufOffset_ = 0;
    +   private long pos_ = 0;
    +   private int len_ = 0;
    +   private int lenRemain_ = 0; 
    +   private int blockSize_; 
    +   private int bytesRead_;
    +   private Future future_ = null;
    +   private int isEOF_ = 0; 
    +   static {
    +      String confFile = System.getProperty("trafodion.log4j.configFile");
    +      System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
    +      if (confFile == null) {
    +         confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
    +      }
    +      PropertyConfigurator.configure(confFile);
    +      config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
    +      executorService_ = Executors.newCachedThreadPool();
    +      try {
    +         defaultFs_ = FileSystem.get(config_);
    +      }
    +      catch (IOException ioe) {
    +         throw new RuntimeException("Exception in HDFSClient static block", ioe);
    +      }
    +   }
    +
    +   class HDFSRead implements Callable 
    +   {
    +      HDFSRead() 
    +      {
    +      }
    + 
    +      public Object call() throws IOException 
    +      {
    +         int bytesRead;
    +         int totalBytesRead = 0;
    +         if (! buf_.hasArray()) {
    --- End diff --
    
    Does this condition mean that somehow the buffer we got from the C side is empty or not accessible?


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by selvaganesang <gi...@git.apache.org>.
Github user selvaganesang commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r164278860
  
    --- Diff: core/sql/executor/ExHdfsScan.cpp ---
    @@ -1948,6 +1948,54 @@ short ExHdfsScanTcb::handleDone(ExWorkProcRetcode &rc)
       return 0;
     }
     
    +void ExHdfsScanTcb::handleException(NAHeap *heap,
    --- End diff --
    
    JavaObjectInterface is the base class for all these classes.


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by selvaganesang <gi...@git.apache.org>.
Github user selvaganesang commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167474597
  
    --- Diff: core/sql/executor/SequenceFileReader.h ---
    @@ -199,15 +185,6 @@ class SequenceFileWriter : public JavaObjectInterface
       
       // Close the file.
       SFW_RetCode    close();
    -
    -  SFW_RetCode    hdfsCreate(const char* path, NABoolean compress);
    -  SFW_RetCode    hdfsWrite(const char* data, Int64 size);
    -  SFW_RetCode    hdfsMergeFiles(const NAString& srcPath,
    -                                 const NAString& dstPath);
    -  SFW_RetCode    hdfsDeletePath(const NAString& delPath);
    -  SFW_RetCode    hdfsCleanUnloadPath(const NAString& uldPath );
    -  SFW_RetCode    hdfsExists(const NAString& uldPath,  NABoolean & exists );
    -  SFW_RetCode    hdfsClose();
       SFW_RetCode    release();
     
       virtual char*  getErrorText(SFW_RetCode errEnum);
    --- End diff --
    
    Yes. It can be static


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by sureshsubbiah <gi...@git.apache.org>.
Github user sureshsubbiah commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167465177
  
    --- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
    @@ -0,0 +1,319 @@
    +// @@@ START COPYRIGHT @@@
    +//
    +// Licensed to the Apache Software Foundation (ASF) under one
    +// or more contributor license agreements.  See the NOTICE file
    +// distributed with this work for additional information
    +// regarding copyright ownership.  The ASF licenses this file
    +// to you under the Apache License, Version 2.0 (the
    +// "License"); you may not use this file except in compliance
    +// with the License.  You may obtain a copy of the License at
    +//
    +//   http://www.apache.org/licenses/LICENSE-2.0
    +//
    +// Unless required by applicable law or agreed to in writing,
    +// software distributed under the License is distributed on an
    +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +// KIND, either express or implied.  See the License for the
    +// specific language governing permissions and limitations
    +// under the License.
    +//
    +// @@@ END COPYRIGHT @@@
    +
    +package org.trafodion.sql;
    +
    +import org.apache.log4j.PropertyConfigurator;
    +import org.apache.log4j.Logger;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.FileUtil;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.conf.Configuration;
    +import java.nio.ByteBuffer;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.hadoop.io.compress.CodecPool;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.Compressor;
    +import org.apache.hadoop.io.compress.GzipCodec;
    +import org.apache.hadoop.io.SequenceFile.CompressionType;
    +import org.apache.hadoop.util.ReflectionUtils;
    +
    +public class HDFSClient 
    +{
    +   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
    +   private static Configuration config_ = null;
    +   private static ExecutorService executorService_ = null;
    +   private static FileSystem defaultFs_ = null;
    +   private FileSystem fs_ = null;
    +   private int bufNo_;
    +   private FSDataInputStream fsdis_; 
    +   private OutputStream outStream_;
    +   private String filename_;
    +   private ByteBuffer buf_;
    +   private int bufLen_;
    +   private int bufOffset_ = 0;
    +   private long pos_ = 0;
    +   private int len_ = 0;
    +   private int lenRemain_ = 0; 
    +   private int blockSize_; 
    +   private int bytesRead_;
    +   private Future future_ = null;
    +    
    +   static {
    +      String confFile = System.getProperty("trafodion.log4j.configFile");
    +      System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
    +      if (confFile == null) {
    +         confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
    +      }
    +      PropertyConfigurator.configure(confFile);
    +      config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
    +      executorService_ = Executors.newCachedThreadPool();
    +      try {
    +         defaultFs_ = FileSystem.get(config_);
    +      }
    +      catch (IOException ioe) {
    +         throw new RuntimeException("Exception in HDFSClient static block", ioe);
    +      }
    +   }
    +
    +   class HDFSRead implements Callable 
    --- End diff --
    
    Thank you for the comments. Much appreciated.


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by sureshsubbiah <gi...@git.apache.org>.
Github user sureshsubbiah commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167412036
  
    --- Diff: core/sql/executor/ExHdfsScan.cpp ---
    @@ -514,11 +541,108 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
     
                 if (step_ == CHECK_FOR_DATA_MOD_AND_DONE)
                   step_ = DONE;
    -            else
    -              step_ = INIT_HDFS_CURSOR;
    +            else {
    +              if (useLibhdfsScan_)
    +                 step_ = INIT_HDFS_CURSOR;
    +              else
    +                 step_ = SETUP_HDFS_SCAN;
    +            }
               }        
               break;
    -
    +        case SETUP_HDFS_SCAN:
    +          {   
    +             if (hdfsScan_ != NULL)
    +                NADELETE(hdfsScan_, HdfsScan, getHeap());
    +             hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(), hdfsScanBuf_, hdfsScanBufMaxSize_, 
    +                            &hdfsFileInfoListAsArray_, beginRangeNum_, numRanges_, hdfsScanTdb().rangeTailIOSize_, 
    +                            hdfsStats_, hdfsScanRetCode);
    +             if (hdfsScanRetCode != HDFS_SCAN_OK)
    +             {
    +                setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, "SETUP_HDFS_SCAN", 
    +                              currContext->getJniErrorStr(), NULL);              
    +                step_ = HANDLE_ERROR_AND_DONE;
    +                break;
    +             } 
    +             bufBegin_ = NULL;
    +             bufEnd_ = NULL;
    +             bufLogicalEnd_ = NULL;
    +             headRoomCopied_ = 0;
    +             prevRangeNum_ = -1;                         
    +             currRangeBytesRead_ = 0;                   
    +             recordSkip_ = FALSE;
    +             extraBytesRead_ = 0;
    +             step_ = TRAF_HDFS_READ;
    +          } 
    +          break;
    +        case TRAF_HDFS_READ:
    +          {
    +             hdfsScanRetCode = hdfsScan_->trafHdfsRead((NAHeap *)getHeap(), hdfsStats_, retArray_, sizeof(retArray_)/sizeof(int));
    +             if (hdfsScanRetCode == HDFS_SCAN_EOR) {
    +                step_ = DONE;
    +                break;
    +             }
    +             else if (hdfsScanRetCode != HDFS_SCAN_OK) {
    +                setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, "SETUP_HDFS_SCAN", 
    +                              currContext->getJniErrorStr(), NULL);              
    +                step_ = HANDLE_ERROR_AND_DONE;
    +                break;
    +             } 
    +             hdfo = hdfsFileInfoListAsArray_.at(retArray_[RANGE_NO]);
    +             bufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED];
    +             if (retArray_[RANGE_NO] != prevRangeNum_) {  
    +                currRangeBytesRead_ = retArray_[BYTES_COMPLETED];
    +                bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_;
    +                if (hdfo->getStartOffset() == 0)
    +                   recordSkip_ = FALSE;
    +                else
    +                   recordSkip_ = TRUE; 
    +             } else {
    +                currRangeBytesRead_ += retArray_[BYTES_COMPLETED];
    +                bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ - headRoomCopied_;
    +                recordSkip_ = FALSE;
    +             }
    +             if (currRangeBytesRead_ > hdfo->getBytesToRead())
    +                extraBytesRead_ = currRangeBytesRead_ - hdfo->getBytesToRead(); 
    +             else
    +                extraBytesRead_ = 0;
    +             // headRoom_ is the number of extra bytes read (rangeTailIOSize)
    +             // If EOF is reached while reading the range and the extraBytes read
    +             // is less than headRoom_, then process all the data till EOF 
    +             if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_)
    +                extraBytesRead_ = 0;
    +             bufLogicalEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED] - extraBytesRead_;
    +             prevRangeNum_ = retArray_[RANGE_NO];
    +             headRoomCopied_ = 0;
    +             if (recordSkip_) {
    +		hdfsBufNextRow_ = hdfs_strchr((char *)bufBegin_,
    +			      hdfsScanTdb().recordDelimiter_, 
    +                              (char *)bufEnd_,
    +			      checkRangeDelimiter_, 
    +			      hdfsScanTdb().getHiveScanMode(), &changedLen);
    +                if (hdfsBufNextRow_ == NULL) {
    --- End diff --
    
    The last record in a file sometimes has no recordDelimiter. Hive accepts this. After some trial and error, the libhdfs approach does too. Are we handling that case correctly? I cannot tell, this is a question.


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by selvaganesang <gi...@git.apache.org>.
Github user selvaganesang commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r164211946
  
    --- Diff: core/sql/executor/ExHbaseAccess.cpp ---
    @@ -502,6 +506,8 @@ void ExHbaseAccessTcb::freeResources()
          NADELETEBASIC(directRowBuffer_, getHeap());
       if (colVal_.val != NULL)
          NADELETEBASIC(colVal_.val, getHeap());
    +  if (hdfsClient_ != NULL) 
    +     NADELETE(hdfsClient_, HdfsClient, getHeap());
     }
    --- End diff --
    
    Yes. It is a good catch. I will fix this too in my next commit.  This won't cause memory leak as such because the heap is destroyed.


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by sureshsubbiah <gi...@git.apache.org>.
Github user sureshsubbiah commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167464842
  
    --- Diff: core/sql/executor/SequenceFileReader.h ---
    @@ -199,15 +185,6 @@ class SequenceFileWriter : public JavaObjectInterface
       
       // Close the file.
       SFW_RetCode    close();
    -
    -  SFW_RetCode    hdfsCreate(const char* path, NABoolean compress);
    -  SFW_RetCode    hdfsWrite(const char* data, Int64 size);
    -  SFW_RetCode    hdfsMergeFiles(const NAString& srcPath,
    -                                 const NAString& dstPath);
    -  SFW_RetCode    hdfsDeletePath(const NAString& delPath);
    -  SFW_RetCode    hdfsCleanUnloadPath(const NAString& uldPath );
    -  SFW_RetCode    hdfsExists(const NAString& uldPath,  NABoolean & exists );
    -  SFW_RetCode    hdfsClose();
       SFW_RetCode    release();
     
       virtual char*  getErrorText(SFW_RetCode errEnum);
    --- End diff --
    
    Should this be static instead of virtual? 


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by selvaganesang <gi...@git.apache.org>.
Github user selvaganesang commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r164278889
  
    --- Diff: core/sql/executor/HdfsClient_JNI.cpp ---
    @@ -0,0 +1,452 @@
    +//**********************************************************************
    +// @@@ START COPYRIGHT @@@
    +//
    +// Licensed to the Apache Software Foundation (ASF) under one
    +// or more contributor license agreements.  See the NOTICE file
    +// distributed with this work for additional information
    +// regarding copyright ownership.  The ASF licenses this file
    +// to you under the Apache License, Version 2.0 (the
    +// "License"); you may not use this file except in compliance
    +// with the License.  You may obtain a copy of the License at
    +//
    +//   http://www.apache.org/licenses/LICENSE-2.0
    +//
    +// Unless required by applicable law or agreed to in writing,
    +// software distributed under the License is distributed on an
    +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +// KIND, either express or implied.  See the License for the
    +// specific language governing permissions and limitations
    +// under the License.
    +//
    +// @@@ END COPYRIGHT @@@
    +// **********************************************************************
    +
    +#include "QRLogger.h"
    +#include "Globals.h"
    +#include "jni.h"
    +#include "HdfsClient_JNI.h"
    +
    +// ===========================================================================
    +// ===== Class HdfsScan
    +// ===========================================================================
    +
    +JavaMethodInit* HdfsScan::JavaMethods_ = NULL;
    +jclass HdfsScan::javaClass_ = 0;
    +bool HdfsScan::javaMethodsInitialized_ = false;
    +pthread_mutex_t HdfsScan::javaMethodsInitMutex_ = PTHREAD_MUTEX_INITIALIZER;
    +
    +static const char* const hdfsScanErrorEnumStr[] = 
    +{
    --- End diff --
    
    This will be populated when HdfsScan JNI methods are introduced


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by sureshsubbiah <gi...@git.apache.org>.
Github user sureshsubbiah commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r164275414
  
    --- Diff: core/sql/executor/ExFastTransport.cpp ---
    @@ -1248,6 +1279,23 @@ void ExHdfsFastExtractTcb::createSequenceFileError(Int32 sfwRetCode)
       updateWorkATPDiagsArea(diagsArea);
     }
     
    +void ExHdfsFastExtractTcb::createHdfsClientFileError(Int32 hdfsClientRetCode)
    +{
    +  ContextCli *currContext = GetCliGlobals()->currContext();
    +
    +  ComDiagsArea * diagsArea = NULL;
    +  char* errorMsg = hdfsClient_->getErrorText((HDFS_Client_RetCode)hdfsClientRetCode);
    +  ExRaiseSqlError(getHeap(),
    +                  &diagsArea,
    +                  (ExeErrorCode)(8447),
    +                  NULL, NULL, NULL, NULL,
    +                  errorMsg,
    +                (char *)currContext->getJniErrorStr().data());
    +  //ex_queue_entry *pentry_down = qParent_.down->getHeadEntry();
    --- End diff --
    
    Consider removing these 2 lines of comments in next commit. There are similar lines in the createSequenceFileError() method that could be removed.


---

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

Posted by sureshsubbiah <gi...@git.apache.org>.
Github user sureshsubbiah commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r164280283
  
    --- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
    @@ -0,0 +1,319 @@
    +// @@@ START COPYRIGHT @@@
    +//
    +// Licensed to the Apache Software Foundation (ASF) under one
    +// or more contributor license agreements.  See the NOTICE file
    +// distributed with this work for additional information
    +// regarding copyright ownership.  The ASF licenses this file
    +// to you under the Apache License, Version 2.0 (the
    +// "License"); you may not use this file except in compliance
    +// with the License.  You may obtain a copy of the License at
    +//
    +//   http://www.apache.org/licenses/LICENSE-2.0
    +//
    +// Unless required by applicable law or agreed to in writing,
    +// software distributed under the License is distributed on an
    +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +// KIND, either express or implied.  See the License for the
    +// specific language governing permissions and limitations
    +// under the License.
    +//
    +// @@@ END COPYRIGHT @@@
    +
    +package org.trafodion.sql;
    +
    +import org.apache.log4j.PropertyConfigurator;
    +import org.apache.log4j.Logger;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.FileUtil;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.conf.Configuration;
    +import java.nio.ByteBuffer;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.hadoop.io.compress.CodecPool;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.Compressor;
    +import org.apache.hadoop.io.compress.GzipCodec;
    +import org.apache.hadoop.io.SequenceFile.CompressionType;
    +import org.apache.hadoop.util.ReflectionUtils;
    +
    +public class HDFSClient 
    +{
    +   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
    +   private static Configuration config_ = null;
    +   private static ExecutorService executorService_ = null;
    +   private static FileSystem defaultFs_ = null;
    +   private FileSystem fs_ = null;
    +   private int bufNo_;
    +   private FSDataInputStream fsdis_; 
    +   private OutputStream outStream_;
    +   private String filename_;
    +   private ByteBuffer buf_;
    +   private int bufLen_;
    +   private int bufOffset_ = 0;
    +   private long pos_ = 0;
    +   private int len_ = 0;
    +   private int lenRemain_ = 0; 
    +   private int blockSize_; 
    +   private int bytesRead_;
    +   private Future future_ = null;
    +    
    +   static {
    +      String confFile = System.getProperty("trafodion.log4j.configFile");
    +      System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
    +      if (confFile == null) {
    +         confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
    +      }
    +      PropertyConfigurator.configure(confFile);
    +      config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
    +      executorService_ = Executors.newCachedThreadPool();
    +      try {
    +         defaultFs_ = FileSystem.get(config_);
    +      }
    +      catch (IOException ioe) {
    +         throw new RuntimeException("Exception in HDFSClient static block", ioe);
    +      }
    +   }
    +
    +   class HDFSRead implements Callable 
    +   {
    +      int length_;
    +
    +      HDFSRead(int length) 
    +      {
    +         length_ = length;
    +      }
    + 
    +      public Object call() throws IOException 
    +      {
    +         int bytesRead;
    +         if (buf_.hasArray())
    +            bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, length_);
    +         else
    +         {
    +            buf_.limit(bufOffset_ + length_);
    +            bytesRead = fsdis_.read(buf_);
    +         }
    +         return new Integer(bytesRead);
    +      }
    +   }
    +       
    +   public HDFSClient() 
    +   {
    +   }
    + 
    +   public HDFSClient(int bufNo, String filename, ByteBuffer buffer, long position, int length) throws IOException
    +   {
    +      bufNo_ = bufNo; 
    +      filename_ = filename;
    +      Path filepath = new Path(filename_);
    +      fs_ = FileSystem.get(filepath.toUri(),config_);
    +      fsdis_ = fs_.open(filepath);
    +      blockSize_ = (int)fs_.getDefaultBlockSize(filepath);
    +      buf_  = buffer;
    +      bufOffset_ = 0;
    +      pos_ = position;
    +      len_ = length;
    +      if (buffer.hasArray()) 
    +         bufLen_ = buffer.array().length;
    +      else
    +      {
    +         bufLen_ = buffer.capacity();
    +         buf_.position(0);
    +      }
    +      lenRemain_ = (len_ > bufLen_) ? bufLen_ : len_;
    +      if (lenRemain_ != 0)
    +      {
    +         int readLength = (lenRemain_ > blockSize_) ? blockSize_ : lenRemain_;
    +         future_ = executorService_.submit(new HDFSRead(readLength));
    +      }
    +   }
    +
    +   public int trafHdfsRead() throws IOException, InterruptedException, ExecutionException
    +   {
    +      Integer retObject = 0;
    +      int bytesRead;
    +      int readLength;
    +       
    +      if (lenRemain_ == 0)
    +         return 0;
    +      retObject = (Integer)future_.get();
    +      bytesRead = retObject.intValue();
    +      if (bytesRead == -1)
    +         return -1;
    +      bufOffset_ += bytesRead;
    +      pos_ += bytesRead;
    +      lenRemain_ -= bytesRead;
    +      if (bufOffset_ == bufLen_)
    +         return bytesRead; 
    +      else if (bufOffset_ > bufLen_)
    +         throw new IOException("Internal Error in trafHdfsRead ");
    +      if (lenRemain_ == 0)
    +         return bytesRead; 
    +      readLength = (lenRemain_ > blockSize_) ? blockSize_ : lenRemain_;
    +      future_ = executorService_.submit(new HDFSRead(readLength));
    +      return bytesRead;
    +   } 
    +
    +   public int trafHdfsReadBuffer() throws IOException, InterruptedException, ExecutionException
    +   {
    +      int bytesRead;
    +      int totalBytesRead = 0;
    +      while (true) {
    +         bytesRead = trafHdfsRead();
    +         if (bytesRead == -1 || bytesRead == 0)
    +            return totalBytesRead;
    +         totalBytesRead += bytesRead;
    +         if (totalBytesRead == bufLen_)
    +              return totalBytesRead;
    +      }  
    +   } 
    +
    +   boolean hdfsCreate(String fname , boolean compress) throws IOException
    +   {
    +     if (logger_.isDebugEnabled()) 
    +        logger_.debug("HDFSClient.hdfsCreate() - started" );
    +      Path filePath = null;
    +      if (!compress || (compress && fname.endsWith(".gz")))
    +        filePath = new Path(fname);
    +      else
    +        filePath = new Path(fname + ".gz");
    +        
    +      FileSystem fs = FileSystem.get(filePath.toUri(),config_);
    +      FSDataOutputStream fsOut = fs.create(filePath, true);
    +      
    +      if (compress) {
    --- End diff --
    
    Since adding a new compression type takes only a few lines of code, it will be good to look into increasing the types of compressed unload files we can write to. I can file a JIRA if there is interest. It is unrelated to focus of this change.


---