You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/06 03:38:24 UTC

[1/2] beam git commit: Improvements to regex transform

Repository: beam
Updated Branches:
  refs/heads/master 83c9831f4 -> 8ea8135ca


Improvements to regex transform

 - named regex groups
 - return all groups
 - allow building from Pattern


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/690677d6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/690677d6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/690677d6

Branch: refs/heads/master
Commit: 690677d68901622d1d07b18d5490fa132486bff6
Parents: 49aeef9
Author: Jesse Anderson <je...@smokinghand.com>
Authored: Wed Dec 28 15:10:34 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jan 5 19:37:35 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/Regex.java   | 589 +++++++++++++++++--
 .../apache/beam/sdk/transforms/RegexTest.java   | 127 +++-
 2 files changed, 679 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/690677d6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java
index 14c5d1b..7e85605 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -53,13 +55,76 @@ public class Regex {
 
   /**
    * Returns a {@link Regex.Matches} {@link PTransform} that checks if the entire line matches the
+   * Regex. Returns the entire line (group 0) as a {@link PCollection}.
+   *
+   * @param pattern The regular expression to run
+   */
+  public static Matches matches(Pattern pattern) {
+    return matches(pattern, 0);
+  }
+
+  /**
+   * Returns a {@link Regex.Matches} {@link PTransform} that checks if the entire line matches the
    * Regex. Returns the group as a {@link PCollection}.
    *
    * @param regex The regular expression to run
    * @param group The Regex group to return as a PCollection
    */
   public static Matches matches(String regex, int group) {
-    return new Matches(regex, group);
+    return matches(Pattern.compile(regex), group);
+  }
+
+  /**
+   * Returns a {@link Regex.Matches} {@link PTransform} that checks if the entire line matches the
+   * Regex. Returns the group as a {@link PCollection}.
+   *
+   * @param pattern The regular expression to run
+   * @param group The Regex group to return as a PCollection
+   */
+  public static Matches matches(Pattern pattern, int group) {
+    return new Matches(pattern, group);
+  }
+
+  /**
+   * Returns a {@link Regex.MatchesName} {@link PTransform} that checks if the entire line matches
+   * the Regex. Returns the group as a {@link PCollection}.
+   *
+   * @param regex The regular expression to run
+   * @param groupName The Regex group name to return as a PCollection
+   */
+  public static MatchesName matches(String regex, String groupName) {
+    return matches(Pattern.compile(regex), groupName);
+  }
+
+  /**
+   * Returns a {@link Regex.MatchesName} {@link PTransform} that checks if the entire line matches
+   * the Regex. Returns the group as a {@link PCollection}.
+   *
+   * @param pattern The regular expression to run
+   * @param groupName The Regex group name to return as a PCollection
+   */
+  public static MatchesName matches(Pattern pattern, String groupName) {
+    return new MatchesName(pattern, groupName);
+  }
+
+  /**
+   * Returns a {@link Regex.AllMatches} {@link PTransform} that checks if the entire line matches
+   * the Regex. Returns all groups as a List&lt;String&gt; in a {@link PCollection}.
+   *
+   * @param regex The regular expression to run
+   */
+  public static AllMatches allMatches(String regex) {
+    return allMatches(Pattern.compile(regex));
+  }
+
+  /**
+   * Returns a {@link Regex.AllMatches} {@link PTransform} that checks if the entire line matches
+   * the Regex. Returns all groups as a List&lt;String&gt; in a {@link PCollection}.
+   *
+   * @param pattern The regular expression to run
+   */
+  public static AllMatches allMatches(Pattern pattern) {
+    return new AllMatches(pattern);
   }
 
   /**
@@ -71,7 +136,44 @@ public class Regex {
    * @param valueGroup The Regex group to use the value
    */
   public static MatchesKV matchesKV(String regex, int keyGroup, int valueGroup) {
-    return new MatchesKV(regex, keyGroup, valueGroup);
+    return matchesKV(Pattern.compile(regex), keyGroup, valueGroup);
+  }
+
+  /**
+   * Returns a {@link Regex.MatchesKV} {@link PTransform} that checks if the entire line matches the
+   * Regex. Returns the specified groups as the key and value as a {@link PCollection}.
+   *
+   * @param pattern The regular expression to run
+   * @param keyGroup The Regex group to use as the key
+   * @param valueGroup The Regex group to use the value
+   */
+  public static MatchesKV matchesKV(Pattern pattern, int keyGroup, int valueGroup) {
+    return new MatchesKV(pattern, keyGroup, valueGroup);
+  }
+
+  /**
+   * Returns a {@link Regex.MatchesNameKV} {@link PTransform} that checks if the entire line matches
+   * the Regex. Returns the specified groups as the key and value as a {@link PCollection}.
+   *
+   * @param regex The regular expression to run
+   * @param keyGroupName The Regex group name to use as the key
+   * @param valueGroupName The Regex group name to use the value
+   */
+  public static MatchesNameKV matchesKV(String regex, String keyGroupName, String valueGroupName) {
+    return matchesKV(Pattern.compile(regex), keyGroupName, valueGroupName);
+  }
+
+  /**
+   * Returns a {@link Regex.MatchesNameKV} {@link PTransform} that checks if the entire line matches
+   * the Regex. Returns the specified groups as the key and value as a {@link PCollection}.
+   *
+   * @param pattern The regular expression to run
+   * @param keyGroupName The Regex group name to use as the key
+   * @param valueGroupName The Regex group name to use the value
+   */
+  public static MatchesNameKV matchesKV(
+      Pattern pattern, String keyGroupName, String valueGroupName) {
+    return new MatchesNameKV(pattern, keyGroupName, valueGroupName);
   }
 
   /**
@@ -86,13 +188,76 @@ public class Regex {
 
   /**
    * Returns a {@link Regex.Find} {@link PTransform} that checks if a portion of the line matches
+   * the Regex. Returns the entire line (group 0) as a {@link PCollection}.
+   *
+   * @param pattern The regular expression to run
+   */
+  public static Find find(Pattern pattern) {
+    return find(pattern, 0);
+  }
+
+  /**
+   * Returns a {@link Regex.Find} {@link PTransform} that checks if a portion of the line matches
    * the Regex. Returns the group as a {@link PCollection}.
    *
    * @param regex The regular expression to run
    * @param group The Regex group to return as a PCollection
    */
   public static Find find(String regex, int group) {
-    return new Find(regex, group);
+    return find(Pattern.compile(regex), group);
+  }
+
+  /**
+   * Returns a {@link Regex.Find} {@link PTransform} that checks if a portion of the line matches
+   * the Regex. Returns the group as a {@link PCollection}.
+   *
+   * @param pattern The regular expression to run
+   * @param group The Regex group to return as a PCollection
+   */
+  public static Find find(Pattern pattern, int group) {
+    return new Find(pattern, group);
+  }
+
+  /**
+   * Returns a {@link Regex.FindName} {@link PTransform} that checks if a portion of the line
+   * matches the Regex. Returns the group as a {@link PCollection}.
+   *
+   * @param regex The regular expression to run
+   * @param groupName The Regex group name to return as a PCollection
+   */
+  public static FindName find(String regex, String groupName) {
+    return find(Pattern.compile(regex), groupName);
+  }
+
+  /**
+   * Returns a {@link Regex.FindName} {@link PTransform} that checks if a portion of the line
+   * matches the Regex. Returns the group as a {@link PCollection}.
+   *
+   * @param pattern The regular expression to run
+   * @param groupName The Regex group name to return as a PCollection
+   */
+  public static FindName find(Pattern pattern, String groupName) {
+    return new FindName(pattern, groupName);
+  }
+
+  /**
+   * Returns a {@link Regex.FindAll} {@link PTransform} that checks if a portion of the line matches
+   * the Regex. Returns all the groups as a List&lt;String&gt; in a {@link PCollection}.
+   *
+   * @param regex The regular expression to run
+   */
+  public static FindAll findAll(String regex) {
+    return findAll(Pattern.compile(regex));
+  }
+
+  /**
+   * Returns a {@link Regex.FindAll} {@link PTransform} that checks if a portion of the line matches
+   * the Regex. Returns all the groups as a List&lt;String&gt; in a {@link PCollection}.
+   *
+   * @param pattern The regular expression to run
+   */
+  public static FindAll findAll(Pattern pattern) {
+    return new FindAll(pattern);
   }
 
   /**
@@ -104,7 +269,43 @@ public class Regex {
    * @param valueGroup The Regex group to use the value
    */
   public static FindKV findKV(String regex, int keyGroup, int valueGroup) {
-    return new FindKV(regex, keyGroup, valueGroup);
+    return findKV(Pattern.compile(regex), keyGroup, valueGroup);
+  }
+
+  /**
+   * Returns a {@link Regex.FindKV} {@link PTransform} that checks if a portion of the line matches
+   * the Regex. Returns the specified groups as the key and value as a {@link PCollection}.
+   *
+   * @param pattern The regular expression to run
+   * @param keyGroup The Regex group to use as the key
+   * @param valueGroup The Regex group to use the value
+   */
+  public static FindKV findKV(Pattern pattern, int keyGroup, int valueGroup) {
+    return new FindKV(pattern, keyGroup, valueGroup);
+  }
+
+  /**
+   * Returns a {@link Regex.FindNameKV} {@link PTransform} that checks if a portion of the line
+   * matches the Regex. Returns the specified groups as the key and value as a {@link PCollection}.
+   *
+   * @param regex The regular expression to run
+   * @param keyGroupName The Regex group name to use as the key
+   * @param valueGroupName The Regex group name to use the value
+   */
+  public static FindNameKV findKV(String regex, String keyGroupName, String valueGroupName) {
+    return findKV(Pattern.compile(regex), keyGroupName, valueGroupName);
+  }
+
+  /**
+   * Returns a {@link Regex.FindNameKV} {@link PTransform} that checks if a portion of the line
+   * matches the Regex. Returns the specified groups as the key and value as a {@link PCollection}.
+   *
+   * @param pattern The regular expression to run
+   * @param keyGroupName The Regex group name to use as the key
+   * @param valueGroupName The Regex group name to use the value
+   */
+  public static FindNameKV findKV(Pattern pattern, String keyGroupName, String valueGroupName) {
+    return new FindNameKV(pattern, keyGroupName, valueGroupName);
   }
 
   /**
@@ -116,7 +317,19 @@ public class Regex {
    * @param replacement The string to be substituted for each match
    */
   public static ReplaceAll replaceAll(String regex, String replacement) {
-    return new ReplaceAll(regex, replacement);
+    return replaceAll(Pattern.compile(regex), replacement);
+  }
+
+  /**
+   * Returns a {@link Regex.ReplaceAll} {@link PTransform} that checks if a portion of the line
+   * matches the Regex and replaces all matches with the replacement String. Returns the group as a
+   * {@link PCollection}.
+   *
+   * @param pattern The regular expression to run
+   * @param replacement The string to be substituted for each match
+   */
+  public static ReplaceAll replaceAll(Pattern pattern, String replacement) {
+    return new ReplaceAll(pattern, replacement);
   }
 
   /**
@@ -128,7 +341,19 @@ public class Regex {
    * @param replacement The string to be substituted for each match
    */
   public static ReplaceFirst replaceFirst(String regex, String replacement) {
-    return new ReplaceFirst(regex, replacement);
+    return replaceFirst(Pattern.compile(regex), replacement);
+  }
+
+  /**
+   * Returns a {@link Regex.ReplaceAll} {@link PTransform} that checks if a portion of the line
+   * matches the Regex and replaces the first match with the replacement String. Returns the group
+   * as a {@link PCollection}.
+   *
+   * @param pattern The regular expression to run
+   * @param replacement The string to be substituted for each match
+   */
+  public static ReplaceFirst replaceFirst(Pattern pattern, String replacement) {
+    return new ReplaceFirst(pattern, replacement);
   }
 
   /**
@@ -139,7 +364,18 @@ public class Regex {
    * @param regex The regular expression to run
    */
   public static Split split(String regex) {
-    return split(regex, false);
+    return split(Pattern.compile(regex), false);
+  }
+
+  /**
+   * Returns a {@link Regex.Split} {@link PTransform} that splits a string on the regular expression
+   * and then outputs each item. It will not output empty items. Returns the group as a {@link
+   * PCollection}. a {@link PCollection}.
+   *
+   * @param pattern The regular expression to run
+   */
+  public static Split split(Pattern pattern) {
+    return split(pattern, false);
   }
 
   /**
@@ -150,7 +386,18 @@ public class Regex {
    * @param outputEmpty Should empty be output. True to output empties and false if not.
    */
   public static Split split(String regex, boolean outputEmpty) {
-    return new Split(regex, outputEmpty);
+    return split(Pattern.compile(regex), outputEmpty);
+  }
+
+  /**
+   * Returns a {@link Regex.Split} {@link PTransform} that splits a string on the regular expression
+   * and then outputs each item. Returns the group as a {@link PCollection}.
+   *
+   * @param pattern The regular expression to run
+   * @param outputEmpty Should empty be output. True to output empties and false if not.
+   */
+  public static Split split(Pattern pattern, boolean outputEmpty) {
+    return new Split(pattern, outputEmpty);
   }
 
   /**
@@ -171,11 +418,11 @@ public class Regex {
    * }</pre>
    */
   public static class Matches extends PTransform<PCollection<String>, PCollection<String>> {
-    Pattern pattern;
+    final Pattern pattern;
     int group;
 
-    public Matches(String regex, int group) {
-      this.pattern = Pattern.compile(regex);
+    public Matches(Pattern pattern, int group) {
+      this.pattern = pattern;
       this.group = group;
     }
 
@@ -185,7 +432,7 @@ public class Regex {
               new DoFn<String, String>() {
                 @ProcessElement
                 public void processElement(ProcessContext c) throws Exception {
-                  Matcher m = pattern.matcher((String) c.element());
+                  Matcher m = pattern.matcher(c.element());
 
                   if (m.matches()) {
                     c.output(m.group(group));
@@ -196,6 +443,96 @@ public class Regex {
   }
 
   /**
+   * {@code Regex.MatchesName<String>} takes a {@code PCollection<String>} and returns a {@code
+   * PCollection<String>} representing the value extracted from the Regex groups of the input {@code
+   * PCollection} to the number of times that element occurs in the input.
+   *
+   * <p>This transform runs a Regex on the entire input line. If the entire line does not match the
+   * Regex, the line will not be output. If it does match the entire line, the group in the Regex
+   * will be used. The output will be the Regex group.
+   *
+   * <p>Example of use:
+   *
+   * <pre>{@code
+   * PCollection<String> words = ...;
+   * PCollection<String> values =
+   *     words.apply(Regex.matches("myregex (?<namedgroup>mygroup)", "namedgroup"));
+   * }</pre>
+   */
+  public static class MatchesName extends PTransform<PCollection<String>, PCollection<String>> {
+    final Pattern pattern;
+    String groupName;
+
+    public MatchesName(Pattern pattern, String groupName) {
+      this.pattern = pattern;
+      this.groupName = groupName;
+    }
+
+    public PCollection<String> expand(PCollection<String> in) {
+      return in.apply(
+          ParDo.of(
+              new DoFn<String, String>() {
+                @ProcessElement
+                public void processElement(ProcessContext c) throws Exception {
+                  Matcher m = pattern.matcher(c.element());
+
+                  if (m.matches()) {
+                    c.output(m.group(groupName));
+                  }
+                }
+              }));
+    }
+  }
+
+  /**
+   * {@code Regex.MatchesName<String>} takes a {@code PCollection<String>} and returns a {@code
+   * PCollection<String>} representing the value extracted from all the Regex groups of the input
+   * {@code PCollection} to the number of times that element occurs in the input.
+   *
+   * <p>This transform runs a Regex on the entire input line. If the entire line does not match the
+   * Regex, the line will not be output. If it does match the entire line, the groups in the Regex
+   * will be used. The output will be all of the Regex groups.
+   *
+   * <p>Example of use:
+   *
+   * <pre>{@code
+   * PCollection<String> words = ...;
+   * PCollection<String> values =
+   *     words.apply(Regex.matches("myregex (mygroup)"));
+   * }</pre>
+   */
+  public static class AllMatches
+      extends PTransform<PCollection<String>, PCollection<List<String>>> {
+    final Pattern pattern;
+
+    public AllMatches(Pattern pattern) {
+      this.pattern = pattern;
+    }
+
+    public PCollection<List<String>> expand(PCollection<String> in) {
+      return in.apply(
+          ParDo.of(
+              new DoFn<String, List<String>>() {
+                @ProcessElement
+                public void processElement(ProcessContext c) throws Exception {
+                  Matcher m = pattern.matcher(c.element());
+
+                  if (m.matches()) {
+                    ArrayList list = new ArrayList(m.groupCount());
+
+                    // +1 because group 0 isn't included
+                    for (int i = 0; i < m.groupCount() + 1; i++) {
+                      list.add(m.group(i));
+                    }
+
+                    c.output(list);
+                  }
+                }
+              }));
+    }
+  }
+
+  /**
    * {@code Regex.MatchesKV<KV<String, String>>} takes a {@code PCollection<String>} and returns a
    * {@code PCollection<KV<String, String>>} representing the key and value extracted from the Regex
    * groups of the input {@code PCollection} to the number of times that element occurs in the
@@ -215,11 +552,11 @@ public class Regex {
    */
   public static class MatchesKV
       extends PTransform<PCollection<String>, PCollection<KV<String, String>>> {
-    Pattern pattern;
+    final Pattern pattern;
     int keyGroup, valueGroup;
 
-    public MatchesKV(String regex, int keyGroup, int valueGroup) {
-      this.pattern = Pattern.compile(regex);
+    public MatchesKV(Pattern pattern, int keyGroup, int valueGroup) {
+      this.pattern = pattern;
       this.keyGroup = keyGroup;
       this.valueGroup = valueGroup;
     }
@@ -230,7 +567,7 @@ public class Regex {
               new DoFn<String, KV<String, String>>() {
                 @ProcessElement
                 public void processElement(ProcessContext c) throws Exception {
-                  Matcher m = pattern.matcher((String) c.element());
+                  Matcher m = pattern.matcher(c.element());
 
                   if (m.find()) {
                     c.output(KV.of(m.group(keyGroup), m.group(valueGroup)));
@@ -241,6 +578,52 @@ public class Regex {
   }
 
   /**
+   * {@code Regex.MatchesNameKV<KV<String, String>>} takes a {@code PCollection<String>} and returns
+   * a {@code PCollection<KV<String, String>>} representing the key and value extracted from the
+   * Regex groups of the input {@code PCollection} to the number of times that element occurs in the
+   * input.
+   *
+   * <p>This transform runs a Regex on the entire input line. If the entire line does not match the
+   * Regex, the line will not be output. If it does match the entire line, the groups in the Regex
+   * will be used. The key will be the key's group and the value will be the value's group.
+   *
+   * <p>Example of use:
+   *
+   * <pre>{@code
+   * PCollection<String> words = ...;
+   * PCollection<KV<String, String>> keysAndValues =
+   *     words.apply(Regex.matchesKV("myregex (?<keyname>mykeygroup) (?<valuename>myvaluegroup)",
+   *       "keyname", "valuename"));
+   * }</pre>
+   */
+  public static class MatchesNameKV
+      extends PTransform<PCollection<String>, PCollection<KV<String, String>>> {
+    final Pattern pattern;
+    String keyGroupName, valueGroupName;
+
+    public MatchesNameKV(Pattern pattern, String keyGroupName, String valueGroupName) {
+      this.pattern = pattern;
+      this.keyGroupName = keyGroupName;
+      this.valueGroupName = valueGroupName;
+    }
+
+    public PCollection<KV<String, String>> expand(PCollection<String> in) {
+      return in.apply(
+          ParDo.of(
+              new DoFn<String, KV<String, String>>() {
+                @ProcessElement
+                public void processElement(ProcessContext c) throws Exception {
+                  Matcher m = pattern.matcher(c.element());
+
+                  if (m.find()) {
+                    c.output(KV.of(m.group(keyGroupName), m.group(valueGroupName)));
+                  }
+                }
+              }));
+    }
+  }
+
+  /**
    * {@code Regex.Find<String>} takes a {@code PCollection<String>} and returns a {@code
    * PCollection<String>} representing the value extracted from the Regex groups of the input {@code
    * PCollection} to the number of times that element occurs in the input.
@@ -258,11 +641,11 @@ public class Regex {
    * }</pre>
    */
   public static class Find extends PTransform<PCollection<String>, PCollection<String>> {
-    Pattern pattern;
+    final Pattern pattern;
     int group;
 
-    public Find(String regex, int group) {
-      this.pattern = Pattern.compile(regex);
+    public Find(Pattern pattern, int group) {
+      this.pattern = pattern;
       this.group = group;
     }
 
@@ -272,7 +655,7 @@ public class Regex {
               new DoFn<String, String>() {
                 @ProcessElement
                 public void processElement(ProcessContext c) throws Exception {
-                  Matcher m = pattern.matcher((String) c.element());
+                  Matcher m = pattern.matcher(c.element());
 
                   if (m.find()) {
                     c.output(m.group(group));
@@ -283,6 +666,95 @@ public class Regex {
   }
 
   /**
+   * {@code Regex.Find<String>} takes a {@code PCollection<String>} and returns a {@code
+   * PCollection<String>} representing the value extracted from the Regex groups of the input {@code
+   * PCollection} to the number of times that element occurs in the input.
+   *
+   * <p>This transform runs a Regex on the entire input line. If a portion of the line does not
+   * match the Regex, the line will not be output. If it does match a portion of the line, the group
+   * in the Regex will be used. The output will be the Regex group.
+   *
+   * <p>Example of use:
+   *
+   * <pre>{@code
+   * PCollection<String> words = ...;
+   * PCollection<String> values =
+   *     words.apply(Regex.find("myregex (?<namedgroup>mygroup)", "namedgroup"));
+   * }</pre>
+   */
+  public static class FindName extends PTransform<PCollection<String>, PCollection<String>> {
+    final Pattern pattern;
+    String groupName;
+
+    public FindName(Pattern pattern, String groupName) {
+      this.pattern = pattern;
+      this.groupName = groupName;
+    }
+
+    public PCollection<String> expand(PCollection<String> in) {
+      return in.apply(
+          ParDo.of(
+              new DoFn<String, String>() {
+                @ProcessElement
+                public void processElement(ProcessContext c) throws Exception {
+                  Matcher m = pattern.matcher(c.element());
+
+                  if (m.find()) {
+                    c.output(m.group(groupName));
+                  }
+                }
+              }));
+    }
+  }
+
+  /**
+   * {@code Regex.Find<String>} takes a {@code PCollection<String>} and returns a {@code
+   * PCollection<String>} representing the value extracted from the Regex groups of the input {@code
+   * PCollection} to the number of times that element occurs in the input.
+   *
+   * <p>This transform runs a Regex on the entire input line. If a portion of the line does not
+   * match the Regex, the line will not be output. If it does match a portion of the line, the
+   * groups in the Regex will be used. The output will be the Regex groups.
+   *
+   * <p>Example of use:
+   *
+   * <pre>{@code
+   * PCollection<String> words = ...;
+   * PCollection<String> values =
+   *     words.apply(Regex.find("myregex (mygroup)"));
+   * }</pre>
+   */
+  public static class FindAll extends PTransform<PCollection<String>, PCollection<List<String>>> {
+    final Pattern pattern;
+
+    public FindAll(Pattern pattern) {
+      this.pattern = pattern;
+    }
+
+    public PCollection<List<String>> expand(PCollection<String> in) {
+      return in.apply(
+          ParDo.of(
+              new DoFn<String, List<String>>() {
+                @ProcessElement
+                public void processElement(ProcessContext c) throws Exception {
+                  Matcher m = pattern.matcher(c.element());
+
+                  if (m.find()) {
+                    ArrayList list = new ArrayList(m.groupCount());
+
+                    // +1 because group 0 isn't included
+                    for (int i = 0; i < m.groupCount() + 1; i++) {
+                      list.add(m.group(i));
+                    }
+
+                    c.output(list);
+                  }
+                }
+              }));
+    }
+  }
+
+  /**
    * {@code Regex.MatchesKV<KV<String, String>>} takes a {@code PCollection<String>} and returns a
    * {@code PCollection<KV<String, String>>} representing the key and value extracted from the Regex
    * groups of the input {@code PCollection} to the number of times that element occurs in the
@@ -303,11 +775,11 @@ public class Regex {
    */
   public static class FindKV
       extends PTransform<PCollection<String>, PCollection<KV<String, String>>> {
-    Pattern pattern;
+    final Pattern pattern;
     int keyGroup, valueGroup;
 
-    public FindKV(String regex, int keyGroup, int valueGroup) {
-      this.pattern = Pattern.compile(regex);
+    public FindKV(Pattern pattern, int keyGroup, int valueGroup) {
+      this.pattern = pattern;
       this.keyGroup = keyGroup;
       this.valueGroup = valueGroup;
     }
@@ -318,7 +790,7 @@ public class Regex {
               new DoFn<String, KV<String, String>>() {
                 @ProcessElement
                 public void processElement(ProcessContext c) throws Exception {
-                  Matcher m = pattern.matcher((String) c.element());
+                  Matcher m = pattern.matcher(c.element());
 
                   if (m.find()) {
                     c.output(KV.of(m.group(keyGroup), m.group(valueGroup)));
@@ -329,6 +801,53 @@ public class Regex {
   }
 
   /**
+   * {@code Regex.MatchesKV<KV<String, String>>} takes a {@code PCollection<String>} and returns a
+   * {@code PCollection<KV<String, String>>} representing the key and value extracted from the Regex
+   * groups of the input {@code PCollection} to the number of times that element occurs in the
+   * input.
+   *
+   * <p>This transform runs a Regex on the entire input line. If a portion of the line does not
+   * match the Regex, the line will not be output. If it does match a portion of the line, the
+   * groups in the Regex will be used. The key will be the key's group and the value will be the
+   * value's group.
+   *
+   * <p>Example of use:
+   *
+   * <pre>{@code
+   * PCollection<String> words = ...;
+   * PCollection<KV<String, String>> keysAndValues =
+   *     words.apply(Regex.findKV("myregex (?<keyname>mykeygroup) (?<valuename>myvaluegroup)",
+   *       "keyname", "valuename"));
+   * }</pre>
+   */
+  public static class FindNameKV
+      extends PTransform<PCollection<String>, PCollection<KV<String, String>>> {
+    final Pattern pattern;
+    String keyGroupName, valueGroupName;
+
+    public FindNameKV(Pattern pattern, String keyGroupName, String valueGroupName) {
+      this.pattern = pattern;
+      this.keyGroupName = keyGroupName;
+      this.valueGroupName = valueGroupName;
+    }
+
+    public PCollection<KV<String, String>> expand(PCollection<String> in) {
+      return in.apply(
+          ParDo.of(
+              new DoFn<String, KV<String, String>>() {
+                @ProcessElement
+                public void processElement(ProcessContext c) throws Exception {
+                  Matcher m = pattern.matcher(c.element());
+
+                  if (m.find()) {
+                    c.output(KV.of(m.group(keyGroupName), m.group(valueGroupName)));
+                  }
+                }
+              }));
+    }
+  }
+
+  /**
    * {@code Regex.ReplaceAll<String>} takes a {@code PCollection<String>} and returns a {@code
    * PCollection<String>} with all Strings that matched the Regex being replaced with the
    * replacement string.
@@ -346,11 +865,11 @@ public class Regex {
    * }</pre>
    */
   public static class ReplaceAll extends PTransform<PCollection<String>, PCollection<String>> {
-    Pattern pattern;
+    final Pattern pattern;
     String replacement;
 
-    public ReplaceAll(String regex, String replacement) {
-      this.pattern = Pattern.compile(regex);
+    public ReplaceAll(Pattern pattern, String replacement) {
+      this.pattern = pattern;
       this.replacement = replacement;
     }
 
@@ -360,7 +879,7 @@ public class Regex {
               new DoFn<String, String>() {
                 @ProcessElement
                 public void processElement(ProcessContext c) throws Exception {
-                  Matcher m = pattern.matcher((String) c.element());
+                  Matcher m = pattern.matcher(c.element());
                   c.output(m.replaceAll(replacement));
                 }
               }));
@@ -385,11 +904,11 @@ public class Regex {
    * }</pre>
    */
   public static class ReplaceFirst extends PTransform<PCollection<String>, PCollection<String>> {
-    Pattern pattern;
+    final Pattern pattern;
     String replacement;
 
-    public ReplaceFirst(String regex, String replacement) {
-      this.pattern = Pattern.compile(regex);
+    public ReplaceFirst(Pattern pattern, String replacement) {
+      this.pattern = pattern;
       this.replacement = replacement;
     }
 
@@ -399,7 +918,7 @@ public class Regex {
               new DoFn<String, String>() {
                 @ProcessElement
                 public void processElement(ProcessContext c) throws Exception {
-                  Matcher m = pattern.matcher((String) c.element());
+                  Matcher m = pattern.matcher(c.element());
                   c.output(m.replaceFirst(replacement));
                 }
               }));
@@ -426,11 +945,11 @@ public class Regex {
    * }</pre>
    */
   public static class Split extends PTransform<PCollection<String>, PCollection<String>> {
-    Pattern pattern;
+    final Pattern pattern;
     boolean outputEmpty;
 
-    public Split(String regex, boolean outputEmpty) {
-      this.pattern = Pattern.compile(regex);
+    public Split(Pattern pattern, boolean outputEmpty) {
+      this.pattern = pattern;
       this.outputEmpty = outputEmpty;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/690677d6/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTest.java
index cd707da..ceebcca 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTest.java
@@ -18,6 +18,8 @@
 package org.apache.beam.sdk.transforms;
 
 import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
 
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
@@ -35,8 +37,7 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class RegexTest implements Serializable {
 
-  @Rule
-  public final transient TestPipeline p = TestPipeline.create();
+  @Rule public final transient TestPipeline p = TestPipeline.create();
 
   @Test
   @Category(NeedsRunner.class)
@@ -69,6 +70,42 @@ public class RegexTest implements Serializable {
 
   @Test
   @Category(NeedsRunner.class)
+  public void testFindNameGroup() {
+    PCollection<String> output =
+        p.apply(Create.of("aj", "xj", "yj", "zj"))
+            .apply(Regex.find("(?<namedgroup>[xyz])", "namedgroup"));
+
+    PAssert.that(output).containsInAnyOrder("x", "y", "z");
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testFindAllGroups() {
+    PCollection<List<String>> output =
+        p.apply(Create.of("aj", "xjx", "yjy", "zjz")).apply(Regex.findAll("([xyz])j([xyz])"));
+
+    PAssert.that(output)
+        .containsInAnyOrder(
+            Arrays.asList("xjx", "x", "x"),
+            Arrays.asList("yjy", "y", "y"),
+            Arrays.asList("zjz", "z", "z"));
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testFindNameNone() {
+    PCollection<String> output =
+        p.apply(Create.of("a", "b", "c", "d"))
+            .apply(Regex.find("(?<namedgroup>[xyz])", "namedgroup"));
+
+    PAssert.that(output).empty();
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
   public void testKVFind() {
 
     PCollection<KV<String, String>> output =
@@ -91,6 +128,30 @@ public class RegexTest implements Serializable {
 
   @Test
   @Category(NeedsRunner.class)
+  public void testKVFindName() {
+
+    PCollection<KV<String, String>> output =
+        p.apply(Create.of("a b c"))
+            .apply(Regex.findKV("a (?<keyname>b) (?<valuename>c)", "keyname", "valuename"));
+
+    PAssert.that(output).containsInAnyOrder(KV.of("b", "c"));
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testKVFindNameNone() {
+
+    PCollection<KV<String, String>> output =
+        p.apply(Create.of("x y z"))
+            .apply(Regex.findKV("a (?<keyname>b) (?<valuename>c)", "keyname", "valuename"));
+
+    PAssert.that(output).empty();
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
   public void testMatches() {
 
     PCollection<String> output =
@@ -124,6 +185,45 @@ public class RegexTest implements Serializable {
 
   @Test
   @Category(NeedsRunner.class)
+  public void testMatchesName() {
+
+    PCollection<String> output =
+        p.apply(Create.of("a", "x xxx", "x yyy", "x zzz"))
+            .apply(Regex.matches("x (?<namedgroup>[xyz]*)", "namedgroup"));
+
+    PAssert.that(output).containsInAnyOrder("xxx", "yyy", "zzz");
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testMatchesNameNone() {
+
+    PCollection<String> output =
+        p.apply(Create.of("a", "b", "c", "d"))
+            .apply(Regex.matches("x (?<namedgroup>[xyz]*)", "namedgroup"));
+
+    PAssert.that(output).empty();
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testAllMatches() {
+
+    PCollection<List<String>> output =
+        p.apply(Create.of("a x", "x x", "y y", "z z")).apply(Regex.allMatches("([xyz]) ([xyz])"));
+
+    PAssert.that(output)
+        .containsInAnyOrder(
+            Arrays.asList("x x", "x", "x"),
+            Arrays.asList("y y", "y", "y"),
+            Arrays.asList("z z", "z", "z"));
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
   public void testKVMatches() {
 
     PCollection<KV<String, String>> output =
@@ -145,6 +245,29 @@ public class RegexTest implements Serializable {
 
   @Test
   @Category(NeedsRunner.class)
+  public void testKVMatchesName() {
+
+    PCollection<KV<String, String>> output =
+        p.apply(Create.of("a b c"))
+            .apply(Regex.findKV("a (?<keyname>b) (?<valuename>c)", "keyname", "valuename"));
+
+    PAssert.that(output).containsInAnyOrder(KV.of("b", "c"));
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testKVMatchesNameNone() {
+
+    PCollection<KV<String, String>> output =
+        p.apply(Create.of("x y z"))
+            .apply(Regex.findKV("a (?<keyname>b) (?<valuename>c)", "keyname", "valuename"));
+    PAssert.that(output).empty();
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
   public void testReplaceAll() {
 
     PCollection<String> output =


[2/2] beam git commit: This closes #1732: Improve Regex

Posted by ke...@apache.org.
This closes #1732: Improve Regex

  Improvements to regex transform


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8ea8135c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8ea8135c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8ea8135c

Branch: refs/heads/master
Commit: 8ea8135ca5f9300e9b816fe273ca6bb89b5fa415
Parents: 83c9831 690677d
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jan 5 19:38:00 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jan 5 19:38:00 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/Regex.java   | 589 +++++++++++++++++--
 .../apache/beam/sdk/transforms/RegexTest.java   | 127 +++-
 2 files changed, 679 insertions(+), 37 deletions(-)
----------------------------------------------------------------------