You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2015/11/30 22:28:28 UTC

[3/7] incubator-apex-malhar git commit: HDHT rename.

HDHT rename.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/1576ce7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/1576ce7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/1576ce7d

Branch: refs/heads/devel-3
Commit: 1576ce7d5366f1a0b79b72511956604092260c00
Parents: 4e47d23
Author: thomas <th...@datatorrent.com>
Authored: Fri Dec 19 17:43:21 2014 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Mon Nov 23 21:37:02 2015 -0800

----------------------------------------------------------------------
 AbstractSinglePortHDSWriter.java | 194 ----------------------------------
 1 file changed, 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/1576ce7d/AbstractSinglePortHDSWriter.java
----------------------------------------------------------------------
diff --git a/AbstractSinglePortHDSWriter.java b/AbstractSinglePortHDSWriter.java
deleted file mode 100644
index 04fa602..0000000
--- a/AbstractSinglePortHDSWriter.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-import javax.validation.constraints.Min;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultPartition;
-import com.datatorrent.api.Partitioner;
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.common.util.Slice;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.google.common.collect.Lists;
-
-/**
- * Operator that receives data on port and writes it to the data store.
- * Implements partitioning, maps partition key to the store bucket.
- * The derived class supplies the codec for partitioning and key-value serialization.
- * @param <EVENT>
- */
-public abstract class AbstractSinglePortHDSWriter<EVENT> extends HDHTWriter implements Partitioner<AbstractSinglePortHDSWriter<EVENT>>
-{
-  public interface HDSCodec<EVENT> extends StreamCodec<EVENT>
-  {
-    byte[] getKeyBytes(EVENT event);
-    byte[] getValueBytes(EVENT event);
-    EVENT fromKeyValue(Slice key, byte[] value);
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(AbstractSinglePortHDSWriter.class);
-
-  protected int partitionMask;
-
-  protected Set<Integer> partitions;
-
-  protected transient HDSCodec<EVENT> codec;
-
-  @Min(1)
-  private int partitionCount = 1;
-
-  public final transient DefaultInputPort<EVENT> input = new DefaultInputPort<EVENT>()
-  {
-    @Override
-    public void process(EVENT event)
-    {
-      try {
-        processEvent(event);
-      } catch (IOException e) {
-        throw new RuntimeException("Error processing " + event, e);
-      }
-    }
-
-    @Override
-    public StreamCodec<EVENT> getStreamCodec()
-    {
-      return getCodec();
-    }
-  };
-
-  public void setPartitionCount(int partitionCount)
-  {
-    this.partitionCount = partitionCount;
-  }
-
-  public int getPartitionCount()
-  {
-    return partitionCount;
-  }
-
-  /**
-   * Storage bucket for the given event. Only one partition can write to a storage bucket and by default it is
-   * identified by the partition id.
-   *
-   * @param event
-   * @return The bucket key.
-   */
-  protected long getBucketKey(EVENT event)
-  {
-    return (codec.getPartition(event) & partitionMask);
-  }
-
-  protected void processEvent(EVENT event) throws IOException
-  {
-    byte[] key = codec.getKeyBytes(event);
-    byte[] value = codec.getValueBytes(event);
-    super.put(getBucketKey(event), new Slice(key), value);
-  }
-
-  abstract protected HDSCodec<EVENT> getCodec();
-
-  @Override
-  public void setup(OperatorContext arg0)
-  {
-    LOG.debug("Store {} with partitions {} {}", super.getFileStore(), new PartitionKeys(this.partitionMask, this.partitions));
-    super.setup(arg0);
-    try {
-      this.codec = getCodec();
-      // inject the operator reference, if such field exists
-      // TODO: replace with broader solution
-      Class<?> cls = this.codec.getClass();
-      while (cls != null) {
-        for (Field field : cls.getDeclaredFields()) {
-          if (field.getType().isAssignableFrom(this.getClass())) {
-            field.setAccessible(true);
-            field.set(this.codec, this);
-          }
-        }
-        cls = cls.getSuperclass();
-      }
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to create codec", e);
-    }
-  }
-
-  @Override
-  public Collection<Partition<AbstractSinglePortHDSWriter<EVENT>>> definePartitions(Collection<Partition<AbstractSinglePortHDSWriter<EVENT>>> partitions, int incrementalCapacity)
-  {
-    boolean isInitialPartition = partitions.iterator().next().getStats() == null;
-
-    if (!isInitialPartition) {
-      // support for dynamic partitioning requires lineage tracking
-      LOG.warn("Dynamic partitioning not implemented");
-      return partitions;
-    }
-
-    int totalCount;
-
-    //Get the size of the partition for parallel partitioning
-    if(incrementalCapacity != 0) {
-      totalCount = incrementalCapacity;
-    }
-    //Do normal partitioning
-    else {
-      totalCount = partitionCount;
-    }
-
-    Kryo lKryo = new Kryo();
-    Collection<Partition<AbstractSinglePortHDSWriter<EVENT>>> newPartitions = Lists.newArrayListWithExpectedSize(totalCount);
-    for (int i = 0; i < totalCount; i++) {
-      // Kryo.copy fails as it attempts to clone transient fields (input port)
-      ByteArrayOutputStream bos = new ByteArrayOutputStream();
-      Output output = new Output(bos);
-      lKryo.writeObject(output, this);
-      output.close();
-      Input lInput = new Input(bos.toByteArray());
-      @SuppressWarnings("unchecked")
-      AbstractSinglePortHDSWriter<EVENT> oper = lKryo.readObject(lInput, this.getClass());
-      newPartitions.add(new DefaultPartition<AbstractSinglePortHDSWriter<EVENT>>(oper));
-    }
-
-    // assign the partition keys
-    DefaultPartition.assignPartitionKeys(newPartitions, input);
-
-    for (Partition<AbstractSinglePortHDSWriter<EVENT>> p : newPartitions) {
-      PartitionKeys pks = p.getPartitionKeys().get(input);
-      p.getPartitionedInstance().partitionMask = pks.mask;
-      p.getPartitionedInstance().partitions = pks.partitions;
-    }
-
-    return newPartitions;
-  }
-
-  @Override
-  public void partitioned(Map<Integer, Partition<AbstractSinglePortHDSWriter<EVENT>>> arg0)
-  {
-  }
-
-}