You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hawq.apache.org by sansanichfb <gi...@git.apache.org> on 2018/03/01 01:24:17 UTC

[GitHub] incubator-hawq pull request #1344: PXF Ignite plugin

Github user sansanichfb commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1344#discussion_r171440009
  
    --- Diff: pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgnitePartitionFragmenter.java ---
    @@ -0,0 +1,411 @@
    +package org.apache.hawq.pxf.plugins.ignite;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.Fragment;
    +import org.apache.hawq.pxf.api.Fragmenter;
    +import org.apache.hawq.pxf.api.FragmentsStats;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +
    +import java.net.InetAddress;
    +import java.net.UnknownHostException;
    +import java.text.ParseException;
    +import java.text.SimpleDateFormat;
    +import java.util.Calendar;
    +import java.util.Date;
    +import java.util.List;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.commons.lang.ArrayUtils;
    +
    +
    +/**
    + * PXF-Ignite fragmenter class
    + * 
    + * This fragmenter works just like the one in PXF JDBC plugin
    + */
    +public class IgnitePartitionFragmenter extends Fragmenter {
    +    /**
    +     * Class constructor
    +     * 
    +     * @throws UserDataException if the request parameter is malformed
    +     */
    +    public IgnitePartitionFragmenter(InputData inputData) throws UserDataException {
    +        super(inputData);
    +        if (LOG.isDebugEnabled()) {
    +            LOG.debug("Constructor started");
    +        }
    +
    +        if (inputData.getUserProperty("PARTITION_BY") == null) {
    +            if (LOG.isDebugEnabled()) {
    +                LOG.debug("Constructor successful; partition was not used");
    +            }
    +            return;
    +        }
    +
    +        try {
    +            partitionBy = inputData.getUserProperty("PARTITION_BY").split(":");
    +            partitionType = PartitionType.typeOf(partitionBy[1]);
    +        }
    +        catch (IllegalArgumentException | ArrayIndexOutOfBoundsException e1) {
    +            throw new UserDataException("The parameter 'PARTITION_BY' is invalid. The pattern is 'column_name:DATE|INT|ENUM'");
    +        }
    +
    +        //parse and validate parameter-RANGE
    +        try {
    +            String rangeStr = inputData.getUserProperty("RANGE");
    +            if (rangeStr != null) {
    +                range = rangeStr.split(":");
    +                if (range.length == 1 && partitionType != PartitionType.ENUM) {
    +                    throw new UserDataException("The parameter 'RANGE' does not specify '[:end_value]'");
    +                }
    +            } 
    +            else {
    +                throw new UserDataException("The parameter 'RANGE' must be specified along with 'PARTITION_BY'");
    +            }
    +        }
    +        catch (IllegalArgumentException e) {
    +            throw new UserDataException("The parameter 'RANGE' is invalid, the pattern is 'start_value[:end_value]'");
    +        }
    +
    +        //parse and validate parameter-INTERVAL
    +        try {
    +            String intervalStr = inputData.getUserProperty("INTERVAL");
    +            if (intervalStr != null) {
    +                interval = intervalStr.split(":");
    +                intervalNum = Integer.parseInt(interval[0]);
    +                if (interval.length > 1) {
    +                    intervalType = IntervalType.typeOf(interval[1]);
    +                }
    +                if (interval.length == 1 && partitionType == PartitionType.DATE) {
    +                    throw new UserDataException("The parameter 'INTERVAL' does not specify unit [:year|month|day]");
    +                }
    +            } 
    +            else if (partitionType != PartitionType.ENUM) {
    +                throw new UserDataException("The parameter 'INTERVAL' must be specified along with 'PARTITION_BY'");
    +            }
    +            if (intervalNum < 1) {
    +                throw new UserDataException("The parameter 'INTERVAL' must be at least 1. The actual is '" + intervalNum + "'");
    +            }
    +        } 
    +        catch (IllegalArgumentException e) {
    +            throw new UserDataException("The parameter 'INTERVAL' invalid. The pattern is 'interval_num[:interval_unit]'");
    +        }
    +
    +        //parse date values
    +        try {
    +            if (partitionType == PartitionType.DATE) {
    +                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
    +                rangeStart = Calendar.getInstance();
    +                rangeStart.setTime(df.parse(range[0]));
    +                rangeEnd = Calendar.getInstance();
    +                rangeEnd.setTime(df.parse(range[1]));
    +            }
    +        } catch (ParseException e) {
    +            throw new UserDataException("The parameter 'RANGE' has invalid date format. Expected format is 'yyyy-MM-dd'");
    +        }
    +
    +        if (LOG.isDebugEnabled()) {
    +            LOG.debug("Constructor successful; some partition used");
    +        }
    +    }
    +
    +    /**
    +     * Returns statistics for the Ignite table. This is not implemented in the current version
    +     * @throws UnsupportedOperationException
    +     */
    +    @Override
    +    public FragmentsStats getFragmentsStats() throws UnsupportedOperationException {
    +        throw new UnsupportedOperationException("ANALYZE for Ignite plugin is not supported");
    +    }
    +
    +    /**
    +     * Returns list of fragments for Ignite table queries
    +     *
    +     * @throws UnsupportedOperationException if a partition of unknown type was found
    +     * 
    +     * @return a list of fragments
    +     */
    +    @Override
    +    public List<Fragment> getFragments() throws UnsupportedOperationException {
    +        if (LOG.isDebugEnabled()) {
    +            LOG.debug("getFragments() called; dataSource is '" + inputData.getDataSource() + "'");
    +        }
    +
    +        byte[] fragmentMetadata = null;
    +        byte[] fragmentUserdata = null;
    +
    +        if (partitionType == null) {
    +            if (LOG.isDebugEnabled()) {
    +                LOG.debug("getFragments() found no partition");
    +            }
    +            Fragment fragment = new Fragment(inputData.getDataSource(), replicaHostAddressWrapped, fragmentMetadata, fragmentUserdata);
    +            fragments.add(fragment);
    +            if (LOG.isDebugEnabled()) {
    +                LOG.debug("getFragments() successful");
    +            }
    +            return fragments;
    +        }
    +
    +        switch (partitionType) {
    +            case DATE: {
    +                if (LOG.isDebugEnabled()) {
    +                    LOG.debug("getFragments() found DATE partition");
    +                }
    +                int currInterval = intervalNum;
    +
    +                Calendar fragStart = rangeStart;
    +                while (fragStart.before(rangeEnd)) {
    +                    Calendar fragEnd = (Calendar) fragStart.clone();
    +                    switch (intervalType) {
    +                        case DAY:
    +                            fragEnd.add(Calendar.DAY_OF_MONTH, currInterval);
    +                            break;
    +                        case MONTH:
    +                            fragEnd.add(Calendar.MONTH, currInterval);
    +                            break;
    +                        case YEAR:
    +                            fragEnd.add(Calendar.YEAR, currInterval);
    +                            break;
    +                    }
    +                    if (fragEnd.after(rangeEnd))
    +                        fragEnd = (Calendar) rangeEnd.clone();
    +
    +                    // Note that the date is stored in milliseconds
    +                    byte[] msStart = ByteUtil.getBytes(fragStart.getTimeInMillis());
    +                    byte[] msEnd = ByteUtil.getBytes(fragEnd.getTimeInMillis());
    +                    fragmentMetadata = ArrayUtils.addAll(msStart, msEnd);
    +
    +                    Fragment fragment = new Fragment(inputData.getDataSource(), replicaHostAddressWrapped, fragmentMetadata, fragmentUserdata);
    +                    fragments.add(fragment);
    +
    +                    // Continue the previous fragment
    +                    fragStart = fragEnd;
    +                }
    +                break;
    +            }
    +            case INT: {
    +                if (LOG.isDebugEnabled()) {
    +                    LOG.debug("getFragments() found INT partition");
    +                }
    +                int rangeStart = Integer.parseInt(range[0]);
    +                int rangeEnd = Integer.parseInt(range[1]);
    +                int currInterval = intervalNum;
    +
    +                int fragStart = rangeStart;
    +                while (fragStart < rangeEnd) {
    +                    int fragEnd = fragStart + currInterval;
    +                    if (fragEnd > rangeEnd) {
    +                        fragEnd = rangeEnd;
    +                    }
    +
    +                    byte[] bStart = ByteUtil.getBytes(fragStart);
    +                    byte[] bEnd = ByteUtil.getBytes(fragEnd);
    +                    fragmentMetadata = ArrayUtils.addAll(bStart, bEnd);
    +
    +                    Fragment fragment = new Fragment(inputData.getDataSource(), replicaHostAddressWrapped, fragmentMetadata, fragmentUserdata);
    +                    fragments.add(fragment);
    +
    +                    // Continue the previous fragment
    +                    fragStart = fragEnd;
    +                }
    +                break;
    +            }
    +            case ENUM: {
    +                if (LOG.isDebugEnabled()) {
    +                    LOG.debug("getFragments() found ENUM partition");
    +                }
    +                for (String frag : range) {
    +                    fragmentMetadata = frag.getBytes();
    +                    Fragment fragment = new Fragment(inputData.getDataSource(), replicaHostAddressWrapped, fragmentMetadata, fragmentUserdata);
    +                    fragments.add(fragment);
    +                }
    +                break;
    +            }
    +            default: {
    +                throw new UnsupportedOperationException("getFragments() found a partition of unknown type and failed");
    +            }
    +        }
    +
    +        if (LOG.isDebugEnabled()) {
    +            LOG.debug("getFragments() successful");
    +        }
    +        return fragments;
    +    }
    +
    +    /**
    +     * Insert partition constraints into the prepared SQL query.
    +     * 
    +     * @param inputData pre-validated PXF InputData
    +     * @param sb the SQL query that is prepared for appending WHERE constraints.
    +     * Other SQL statements may be present, but they must be complete. Note that no check is performed to check their "completeness"
    +     */
    +    public static StringBuilder buildFragmenterSql(InputData inputData, StringBuilder sb) {
    --- End diff --
    
    Do we really want to mutate input parameter and then return it?


---